From 78e8af4a280120386ab5ce0973c4884a47830211 Mon Sep 17 00:00:00 2001 From: becomeStar Date: Fri, 31 Oct 2025 13:24:05 +0900 Subject: [PATCH 1/3] binder: fix race between newStream() and unregisterInbound() by synchronizing in-use updates Previously, concurrent calls to newStream() and unregisterInbound() could both update numInUseStreams and invoke transportInUse() in conflicting order, leading to inconsistent listener state. This change synchronizes updates and only notifies the listener on transitions between 0 and >0. Fixes #10917 --- .../internal/BinderClientTransport.java | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index bef1eefd43e..7a87452244c 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -60,7 +60,7 @@ import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -79,7 +79,12 @@ public final class BinderClientTransport extends BinderTransport private final ClientHandshake handshake; /** Number of ongoing calls which keep this transport "in-use". */ - private final AtomicInteger numInUseStreams; + @GuardedBy("this") + private int numInUseStreams; + + /** Last in-use state that was reported to the listener */ + @GuardedBy("this") + private boolean listenerInUse; private final long readyTimeoutMillis; private final PingTracker pingTracker; @@ -120,9 +125,11 @@ public BinderClientTransport( Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); this.preAuthorizeServer = preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; + this.handshake = factory.useLegacyAuthStrategy ? new LegacyClientHandshake() : new V2ClientHandshake(); - numInUseStreams = new AtomicInteger(); + numInUseStreams = 0; + listenerInUse = false; pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); serviceBinding = new ServiceBinding( @@ -266,9 +273,7 @@ public synchronized ClientStream newStream( return newFailingClientStream(failure, attributes, headers, tracers); } - if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { - clientTransportListener.transportInUse(true); - } + updateInUseStreamsIfNeed(inbound.countsForInUse(), 1); Outbound.ClientOutbound outbound = new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); if (method.getType().clientSendsOneMessage()) { @@ -280,9 +285,7 @@ public synchronized ClientStream newStream( @Override protected void unregisterInbound(Inbound inbound) { - if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { - clientTransportListener.transportInUse(false); - } + updateInUseStreamsIfNeed(inbound.countsForInUse(), -1); super.unregisterInbound(inbound); } @@ -312,7 +315,9 @@ void notifyShutdown(Status status) { @Override @GuardedBy("this") void notifyTerminated() { - if (numInUseStreams.getAndSet(0) > 0) { + if(numInUseStreams > 0) { + numInUseStreams = 0; + listenerInUse = false; clientTransportListener.transportInUse(false); } if (readyTimeoutFuture != null) { @@ -453,6 +458,25 @@ private synchronized void handleAuthResult(Throwable t) { Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); } + /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */ + private synchronized void updateInUseStreamsIfNeed(boolean countsForInUse, int delta) { + if(!countsForInUse) { + return; + } + + numInUseStreams += delta; + if(numInUseStreams < 0) { + // Defensive: prevent negative due to unexpected double-decrement + numInUseStreams = 0; + } + + boolean nowInUseStream = numInUseStreams > 0; + if(nowInUseStream != listenerInUse) { + listenerInUse = nowInUseStream; + clientTransportListener.transportInUse(nowInUseStream); + } + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) { From f00ff974116876ed8cdc875c354ca6a0e80a6d2d Mon Sep 17 00:00:00 2001 From: becomeStar Date: Thu, 6 Nov 2025 00:27:53 +0900 Subject: [PATCH 2/3] fix: avoid potential deadlock by scheduling listener notifications outside transport lock --- .../internal/BinderClientTransport.java | 89 ++++++++++++++----- 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index 7a87452244c..47e9ccd06e9 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -25,8 +25,11 @@ import android.os.IBinder; import android.os.Parcel; import android.os.Process; + import androidx.annotation.BinderThread; import androidx.annotation.MainThread; + +import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -60,6 +63,8 @@ import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -79,12 +84,13 @@ public final class BinderClientTransport extends BinderTransport private final ClientHandshake handshake; /** Number of ongoing calls which keep this transport "in-use". */ - @GuardedBy("this") - private int numInUseStreams; + private final AtomicInteger numInUseStreams; /** Last in-use state that was reported to the listener */ - @GuardedBy("this") - private boolean listenerInUse; + private final AtomicBoolean listenerInUse; + + /** Synchronizes transport listener callbacks */ + private final Object listenerNotifyLock; private final long readyTimeoutMillis; private final PingTracker pingTracker; @@ -125,11 +131,11 @@ public BinderClientTransport( Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); this.preAuthorizeServer = preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; - this.handshake = factory.useLegacyAuthStrategy ? new LegacyClientHandshake() : new V2ClientHandshake(); - numInUseStreams = 0; - listenerInUse = false; + this.numInUseStreams = new AtomicInteger(); + this.listenerInUse = new AtomicBoolean(); + this.listenerNotifyLock = new Object(); pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); serviceBinding = new ServiceBinding( @@ -273,7 +279,7 @@ public synchronized ClientStream newStream( return newFailingClientStream(failure, attributes, headers, tracers); } - updateInUseStreamsIfNeed(inbound.countsForInUse(), 1); + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), 1); Outbound.ClientOutbound outbound = new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); if (method.getType().clientSendsOneMessage()) { @@ -285,7 +291,7 @@ public synchronized ClientStream newStream( @Override protected void unregisterInbound(Inbound inbound) { - updateInUseStreamsIfNeed(inbound.countsForInUse(), -1); + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), -1); super.unregisterInbound(inbound); } @@ -315,9 +321,8 @@ void notifyShutdown(Status status) { @Override @GuardedBy("this") void notifyTerminated() { - if(numInUseStreams > 0) { - numInUseStreams = 0; - listenerInUse = false; + if (numInUseStreams.getAndSet(0) > 0) { + listenerInUse.set(false); clientTransportListener.transportInUse(false); } if (readyTimeoutFuture != null) { @@ -458,25 +463,63 @@ private synchronized void handleAuthResult(Throwable t) { Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); } - /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */ - private synchronized void updateInUseStreamsIfNeed(boolean countsForInUse, int delta) { - if(!countsForInUse) { + /** + * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without + * acquiring the transport lock. + */ + private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) { + Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1"); + if (!countsForInUse) { return; } + int prev, next; - numInUseStreams += delta; - if(numInUseStreams < 0) { - // Defensive: prevent negative due to unexpected double-decrement - numInUseStreams = 0; + if (delta > 0) { + next = numInUseStreams.incrementAndGet(); + prev = next - 1; + } else { + prev = numInUseStreams.get(); + int updated; + + while (true) { + int current = prev; + int newValue = current > 0 ? current - 1 : 0; + if (numInUseStreams.compareAndSet(current, newValue)) { + updated = newValue; + break; + } + prev = numInUseStreams.get(); + } + next = updated; } - boolean nowInUseStream = numInUseStreams > 0; - if(nowInUseStream != listenerInUse) { - listenerInUse = nowInUseStream; - clientTransportListener.transportInUse(nowInUseStream); + boolean prevInUse = prev > 0; + boolean nextInUse = next > 0; + + if (prevInUse != nextInUse) { + if (listenerInUse.compareAndSet(prevInUse, nextInUse)) { + scheduleTransportInUseNotification(nextInUse); + } } } + private void scheduleTransportInUseNotification(final boolean inUse) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + // Provide external synchronization as required by Listener contract, + // without taking the transport lock to avoid potential deadlocks. + synchronized (listenerNotifyLock) { + if (listenerInUse.get() == inUse) { + clientTransportListener.transportInUse(inUse); + } + } + } + }); + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) { From 1412181348751ca987f24306e9c3d287961d8b57 Mon Sep 17 00:00:00 2001 From: becomeStar Date: Mon, 17 Nov 2025 23:37:28 +0900 Subject: [PATCH 3/3] binder: make Listener callbacks mutually exclusive and fix in-use race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Serialize transportShutdown/transportTerminated/transportReady/transportInUse under listener lock - Atomic in-use counter with reconcile to prevent incorrect false when −1/+1 race occurs - Dispatch callbacks asynchronously to avoid lock-order deadlocks - Behavior unchanged for users; improves correctness under concurrency --- .../internal/BinderClientTransport.java | 103 ++++++++++++++---- 1 file changed, 80 insertions(+), 23 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index 47e9ccd06e9..31cb44e77f5 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -86,10 +86,10 @@ public final class BinderClientTransport extends BinderTransport /** Number of ongoing calls which keep this transport "in-use". */ private final AtomicInteger numInUseStreams; - /** Last in-use state that was reported to the listener */ + /** Last in-use state reported to the transport listener */ private final AtomicBoolean listenerInUse; - /** Synchronizes transport listener callbacks */ + /** Serializes transport listener callbacks */ private final Object listenerNotifyLock; private final long readyTimeoutMillis; @@ -315,22 +315,39 @@ public synchronized void shutdownNow(Status reason) { @Override @GuardedBy("this") void notifyShutdown(Status status) { - clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN); + // Defer listener invocation to the listener executor to avoid calling + // external code while holding the transport lock. + scheduleOnListener( + new Runnable() { + @Override + public void run() { + clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN); + } + }); } @Override @GuardedBy("this") void notifyTerminated() { if (numInUseStreams.getAndSet(0) > 0) { - listenerInUse.set(false); - clientTransportListener.transportInUse(false); + if (listenerInUse.compareAndSet(true, false)) { + scheduleTransportInUseNotification(false); + } else { + listenerInUse.set(false); + } } if (readyTimeoutFuture != null) { readyTimeoutFuture.cancel(false); readyTimeoutFuture = null; } serviceBinding.unbind(); - clientTransportListener.transportTerminated(); + scheduleOnListener( + new Runnable() { + @Override + public void run() { + clientTransportListener.transportTerminated(); + } + }); } @Override @@ -450,8 +467,11 @@ public void handleSetupTransport() { @GuardedBy("this") private void onHandshakeComplete() { setState(TransportState.READY); - attributes = clientTransportListener.filterTransport(attributes); - clientTransportListener.transportReady(); + final Attributes currentAttrs = attributes; + // Defer listener callbacks (filterTransport and transportReady) to the listener executor + // to avoid invoking listener code while holding the transport lock. + scheduleFilterTransportAndReady(currentAttrs); + if (readyTimeoutFuture != null) { readyTimeoutFuture.cancel(false); readyTimeoutFuture = null; @@ -464,42 +484,40 @@ private synchronized void handleAuthResult(Throwable t) { } /** - * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without - * acquiring the transport lock. + * Updates the in-use stream count and triggers reconciliation of the listener in-use state, + * without acquiring the transport lock. */ private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) { Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1"); if (!countsForInUse) { return; } - int prev, next; if (delta > 0) { - next = numInUseStreams.incrementAndGet(); - prev = next - 1; + numInUseStreams.incrementAndGet(); } else { - prev = numInUseStreams.get(); - int updated; + // Decrement with floor at 0 + int prev = numInUseStreams.get(); while (true) { int current = prev; int newValue = current > 0 ? current - 1 : 0; if (numInUseStreams.compareAndSet(current, newValue)) { - updated = newValue; break; } prev = numInUseStreams.get(); } - next = updated; } + reconcileInUseState(); + } - boolean prevInUse = prev > 0; - boolean nextInUse = next > 0; + /** Reconcile listenerInUse with the current stream count to avoid stale toggles under races. */ + private void reconcileInUseState() { + boolean nowInUse = numInUseStreams.get() > 0; + boolean prev = listenerInUse.get(); - if (prevInUse != nextInUse) { - if (listenerInUse.compareAndSet(prevInUse, nextInUse)) { - scheduleTransportInUseNotification(nextInUse); - } + if(prev != nowInUse && listenerInUse.compareAndSet(prev, nowInUse)) { + scheduleTransportInUseNotification(nowInUse); } } @@ -520,6 +538,45 @@ public void run() { }); } + private void scheduleFilterTransportAndReady(final Attributes attrsSnapshot) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + final Attributes filtered; + synchronized (listenerNotifyLock) { + filtered = clientTransportListener.filterTransport(attrsSnapshot); + } + + synchronized (BinderClientTransport.class) { + attributes = filtered; + } + + scheduleOnListener( + new Runnable() { + @Override + public void run() { + clientTransportListener.transportReady(); + } + }); + } + }); + } + + private void scheduleOnListener(final Runnable task) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + synchronized (listenerNotifyLock) { + task.run(); + } + } + }); + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) {