From d85d4f7c527c1ad1b5afb58c55238742632c5946 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 19 Nov 2025 07:58:12 -0800 Subject: [PATCH 1/2] Prefer FixedResultPicker over custom Picker implementations These other implementations pre-date FixedResultPicker, and are no longer needed. gRPC-LB's tests failed when using FixedResultPicker because it didn't transition to READY. This was because GrpclbState.maybeUpdatePicker() didn't consider the ConnectivityState when checking if anything had changed. The old PickFirstLoadBalancer.Picker didn't implement equals() so every update was considered different, ignoring the connectivity state. PickFirstLeafLoadBalancer wasn't impacted because it didn't pick a subchannel when in CONNECTING, and neither did PickFirstLoadBalancer after the first update. This is fixed by gRPC-LB checking the connectivity state. A follow-up will have PickFirstLoadBalancer no longer return the useless subchannel when picking during CONNECTING. --- .../AutoConfiguredLoadBalancerFactory.java | 36 +++-------------- .../java/io/grpc/internal/OobChannel.java | 39 ++----------------- .../internal/PickFirstLeafLoadBalancer.java | 35 +++-------------- .../grpc/internal/PickFirstLoadBalancer.java | 34 +++------------- .../ShufflingPickFirstLoadBalancer.java | 34 +++------------- .../main/java/io/grpc/grpclb/GrpclbState.java | 5 ++- .../java/io/grpc/rls/RlsLoadBalancer.java | 18 +-------- .../io/grpc/rls/CachingRlsLbClientTest.java | 11 ++---- .../OutlierDetectionLoadBalancerTest.java | 8 +--- .../io/grpc/xds/LeastRequestLoadBalancer.java | 4 +- .../xds/LeastRequestLoadBalancerTest.java | 36 ++++++++--------- 11 files changed, 58 insertions(+), 202 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index a257637de22..6b8537fd658 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -19,17 +19,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; -import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; @@ -110,7 +108,8 @@ Status tryAcceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy"); } catch (PolicyException e) { Status s = Status.INTERNAL.withDescription(e.getMessage()); - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); + helper.updateBalancingState( + ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(s))); delegate.shutdown(); delegateProvider = null; delegate = new NoopLoadBalancer(); @@ -122,7 +121,8 @@ Status tryAcceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (delegateProvider == null || !policySelection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) { - helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker()); + helper.updateBalancingState( + ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult())); delegate.shutdown(); delegateProvider = policySelection.provider; LoadBalancer old = delegate; @@ -236,30 +236,4 @@ private PolicyException(String msg) { super(msg); } } - - private static final class EmptyPicker extends SubchannelPicker { - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(EmptyPicker.class).toString(); - } - } - - private static final class FailingPicker extends SubchannelPicker { - private final Status failure; - - FailingPicker(Status failure) { - this.failure = failure; - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(failure); - } - } } diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 71973ed5d64..b2272a89672 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -38,8 +38,8 @@ import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.ManagedChannel; @@ -182,23 +182,7 @@ public Object getInternalSubchannel() { } }; - final class OobSubchannelPicker extends SubchannelPicker { - final PickResult result = PickResult.withSubchannel(subchannelImpl); - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(OobSubchannelPicker.class) - .add("result", result) - .toString(); - } - } - - subchannelPicker = new OobSubchannelPicker(); + subchannelPicker = new FixedResultPicker(PickResult.withSubchannel(subchannelImpl)); delayedTransport.reprocess(subchannelPicker); } @@ -270,23 +254,8 @@ void handleSubchannelStateChange(final ConnectivityStateInfo newState) { delayedTransport.reprocess(subchannelPicker); break; case TRANSIENT_FAILURE: - final class OobErrorPicker extends SubchannelPicker { - final PickResult errorResult = PickResult.withError(newState.getStatus()); - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return errorResult; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(OobErrorPicker.class) - .add("errorResult", errorResult) - .toString(); - } - } - - delayedTransport.reprocess(new OobErrorPicker()); + delayedTransport.reprocess( + new FixedResultPicker(PickResult.withError(newState.getStatus()))); break; default: // Do nothing diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index 2689d7d2308..935214a94fd 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -24,7 +24,6 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.grpc.Attributes; @@ -164,7 +163,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (noOldAddrs) { // Make tests happy; they don't properly assume starting in CONNECTING rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); } if (rawConnectivityState == READY) { @@ -237,7 +236,7 @@ public void handleNameResolutionError(Status error) { subchannels.clear(); addressIndex.updateGroups(ImmutableList.of()); rawConnectivityState = TRANSIENT_FAILURE; - updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) { @@ -290,7 +289,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo case CONNECTING: rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); break; case READY: @@ -322,7 +321,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo if (isPassComplete()) { rawConnectivityState = TRANSIENT_FAILURE; updateBalancingState(TRANSIENT_FAILURE, - new Picker(PickResult.withError(stateInfo.getStatus()))); + new FixedResultPicker(PickResult.withError(stateInfo.getStatus()))); // Refresh Name Resolution, but only when all 3 conditions are met // * We are at the end of addressIndex @@ -385,11 +384,11 @@ private void updateHealthCheckedState(SubchannelData subchannelData) { updateBalancingState(READY, new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel))); } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) { - updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError( + updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError( subchannelData.healthStateInfo.getStatus()))); } else if (concludedState != TRANSIENT_FAILURE) { updateBalancingState(subchannelData.getHealthState(), - new Picker(PickResult.withNoResult())); + new FixedResultPicker(PickResult.withNoResult())); } } @@ -593,28 +592,6 @@ ConnectivityState getConcludedConnectivityState() { return this.concludedState; } - /** - * No-op picker which doesn't add any custom picking logic. It just passes already known result - * received in constructor. - */ - private static final class Picker extends SubchannelPicker { - private final PickResult result; - - Picker(PickResult result) { - this.result = checkNotNull(result, "result"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); - } - } - /** * Picker that requests connection during the first pick, and returns noResult. */ diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index a23855e67ec..a41ef03fbda 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -22,7 +22,6 @@ import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import com.google.common.base.MoreObjects; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; @@ -87,7 +86,8 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); + updateBalancingState( + CONNECTING, new FixedResultPicker(PickResult.withSubchannel(subchannel))); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -105,7 +105,7 @@ public void handleNameResolutionError(Status error) { // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine // for time being. - updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -139,13 +139,13 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new Picker(PickResult.withNoResult()); + picker = new FixedResultPicker(PickResult.withNoResult()); break; case READY: - picker = new Picker(PickResult.withSubchannel(subchannel)); + picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); break; case TRANSIENT_FAILURE: - picker = new Picker(PickResult.withError(stateInfo.getStatus())); + picker = new FixedResultPicker(PickResult.withError(stateInfo.getStatus())); break; default: throw new IllegalArgumentException("Unsupported state:" + newState); @@ -173,28 +173,6 @@ public void requestConnection() { } } - /** - * No-op picker which doesn't add any custom picking logic. It just passes already known result - * received in constructor. - */ - private static final class Picker extends SubchannelPicker { - private final PickResult result; - - Picker(PickResult result) { - this.result = checkNotNull(result, "result"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); - } - } - /** Picker that requests connection during the first pick, and returns noResult. */ private final class RequestConnectionPicker extends SubchannelPicker { private final AtomicBoolean connectionRequested = new AtomicBoolean(false); diff --git a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java index b49c856c4d0..4715b551524 100644 --- a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java +++ b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java @@ -92,7 +92,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { }); this.subchannel = subchannel; - helper.updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -107,7 +107,8 @@ public void handleNameResolutionError(Status error) { subchannel.shutdown(); subchannel = null; } - helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -125,13 +126,13 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo picker = new RequestConnectionPicker(); break; case CONNECTING: - picker = new Picker(PickResult.withNoResult()); + picker = new FixedResultPicker(PickResult.withNoResult()); break; case READY: - picker = new Picker(PickResult.withSubchannel(subchannel)); + picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); break; case TRANSIENT_FAILURE: - picker = new Picker(PickResult.withError(stateInfo.getStatus())); + picker = new FixedResultPicker(PickResult.withError(stateInfo.getStatus())); break; default: throw new IllegalArgumentException("Unsupported state:" + currentState); @@ -154,29 +155,6 @@ public void requestConnection() { } } - /** - * No-op picker which doesn't add any custom picking logic. It just passes already known result - * received in constructor. - */ - private static final class Picker extends SubchannelPicker { - - private final PickResult result; - - Picker(PickResult result) { - this.result = checkNotNull(result, "result"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); - } - } - /** * Picker that requests connection during the first pick, and returns noResult. */ diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 7ca20d58bce..2fc2a492ac8 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -187,6 +187,7 @@ enum Mode { private List dropList = Collections.emptyList(); // Contains only non-drop, i.e., backends from the round-robin list from the balancer. private List backendList = Collections.emptyList(); + private ConnectivityState currentState = ConnectivityState.CONNECTING; private RoundRobinPicker currentPicker = new RoundRobinPicker(Collections.emptyList(), Arrays.asList(BUFFER_ENTRY)); private boolean requestConnectionPending; @@ -937,10 +938,12 @@ private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) // Discard the new picker if we are sure it won't make any difference, in order to save // re-processing pending streams, and avoid unnecessary resetting of the pointer in // RoundRobinPicker. - if (picker.dropList.equals(currentPicker.dropList) + if (state.equals(currentState) + && picker.dropList.equals(currentPicker.dropList) && picker.pickList.equals(currentPicker.pickList)) { return; } + currentState = state; currentPicker = picker; helper.updateBalancingState(state, picker); } diff --git a/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java b/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java index 6e59e867e32..848199f50a8 100644 --- a/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java +++ b/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; @@ -93,27 +92,14 @@ public void requestConnection() { @Override public void handleNameResolutionError(final Status error) { - class ErrorPicker extends SubchannelPicker { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(error); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("error", error) - .toString(); - } - } - if (routeLookupClient != null) { logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient on a name resolution error"); routeLookupClient.close(); routeLookupClient = null; lbPolicyConfiguration = null; } - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker()); + helper.updateBalancingState( + ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } @Override diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 12483d60794..b349aecdbf3 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -1025,14 +1025,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { @Override public void handleNameResolutionError(final Status error) { - class ErrorPicker extends SubchannelPicker { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(error); - } - } - - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker()); + helper.updateBalancingState( + ConnectivityState.TRANSIENT_FAILURE, + new FixedResultPicker(PickResult.withError(error))); } @Override diff --git a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java index c4eb4c7bae5..10436407422 100644 --- a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -54,7 +54,6 @@ import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; -import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; import io.grpc.util.OutlierDetectionLoadBalancer.EndpointTracker; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; @@ -568,9 +567,7 @@ public void successRateOneOutlier_configChange() { loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers)); - // The PickFirstLeafLB has an extra level of indirection because of health - int expectedStateChanges = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 8 : 12; - generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED), expectedStateChanges); + generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED), 8); // Move forward in time to a point where the detection timer has fired. forwardTime(config); @@ -604,8 +601,7 @@ public void successRateOneOutlier_unejected() { assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); // Now we produce more load, but the subchannel has started working and is no longer an outlier. - int expectedStateChanges = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 8 : 12; - generateLoad(ImmutableMap.of(), expectedStateChanges); + generateLoad(ImmutableMap.of(), 8); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.maxEjectionTimeNanos + 1, TimeUnit.NANOSECONDS); diff --git a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java index dda2ad177e6..1f23f2a4af5 100644 --- a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java @@ -54,7 +54,7 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer { private final ThreadSafeRandom random; - private SubchannelPicker currentPicker = new EmptyPicker(); + private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); private int choiceCount = DEFAULT_CHOICE_COUNT; LeastRequestLoadBalancer(Helper helper) { @@ -113,7 +113,7 @@ protected void updateOverallBalancingState() { } } if (isConnecting) { - updateBalancingState(CONNECTING, new EmptyPicker()); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); } else { // Give it all the failing children and let it randomly pick among them updateBalancingState(TRANSIENT_FAILURE, diff --git a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java index 6fb6507fa4e..302faed95a4 100644 --- a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java @@ -22,6 +22,7 @@ import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.LoadBalancerMatchers.pickerReturns; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -50,6 +51,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -62,7 +64,6 @@ import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.util.AbstractTestHelper; import io.grpc.util.MultiChildLoadBalancer.ChildLbState; -import io.grpc.xds.LeastRequestLoadBalancer.EmptyPicker; import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig; import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestLbState; import io.grpc.xds.LeastRequestLoadBalancer.ReadyPicker; @@ -238,7 +239,8 @@ public void pickAfterStateChange() throws Exception { ChildLbState childLbState = loadBalancer.getChildLbStates().iterator().next(); Subchannel subchannel = getSubchannel(servers.get(0)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(childLbState.getCurrentState()).isEqualTo(CONNECTING); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); @@ -251,7 +253,8 @@ public void pickAfterStateChange() throws Exception { assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); assertThat(childLbState.getCurrentPicker().toString()).contains(error.toString()); refreshInvokedAndUpdateBS(inOrder, CONNECTING); - assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs)) + .isEqualTo(PickResult.withNoResult()); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); @@ -302,7 +305,8 @@ public void ignoreShutdownSubchannelStateChange() { ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY) .build()); assertThat(addressesAcceptanceStatus.isOk()).isTrue(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); List savedSubchannels = new ArrayList<>(subchannels.values()); loadBalancer.shutdown(); @@ -324,7 +328,8 @@ public void stayTransientFailureUntilReady() { .build()); assertThat(addressesAcceptanceStatus.isOk()).isTrue(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // Simulate state transitions for each subchannel individually. List children = new ArrayList<>(loadBalancer.getChildLbStates()); @@ -384,7 +389,8 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() { assertThat(addressesAcceptanceStatus.isOk()).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // Simulate state transitions for each subchannel individually. for (Subchannel sc : subchannels.values()) { @@ -399,7 +405,8 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() { deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); verify(sc, times(2)).requestConnection(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); } AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); @@ -469,14 +476,6 @@ public void pickerLeastRequest() throws Exception { assertEquals(0, ((LeastRequestLbState) childLbStates.get(0)).getActiveRequests()); } - @Test - public void pickerEmptyList() throws Exception { - SubchannelPicker picker = new EmptyPicker(); - - assertNull(picker.pickSubchannel(mockArgs).getSubchannel()); - assertEquals(Status.OK, picker.pickSubchannel(mockArgs).getStatus()); - } - @Test public void nameResolutionErrorWithNoChannels() throws Exception { Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); @@ -554,7 +553,8 @@ public void subchannelStateIsolation() throws Exception { Iterator pickers = pickerCaptor.getAllValues().iterator(); // The picker is incrementally updated as subchannels become READY assertEquals(CONNECTING, stateIterator.next()); - assertThat(pickers.next()).isInstanceOf(EmptyPicker.class); + assertThat(pickers.next().pickSubchannel(mockArgs)) + .isEqualTo(PickResult.withNoResult()); assertEquals(READY, stateIterator.next()); assertThat(getList(pickers.next())).containsExactly(sc1); assertEquals(READY, stateIterator.next()); @@ -585,8 +585,8 @@ public void readyPicker_emptyList() { @Test public void internalPickerComparisons() { - EmptyPicker empty1 = new EmptyPicker(); - EmptyPicker empty2 = new EmptyPicker(); + FixedResultPicker empty1 = new FixedResultPicker(PickResult.withNoResult()); + FixedResultPicker empty2 = new FixedResultPicker(PickResult.withNoResult()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); From c486dd053cb2daacf1c38bb21969ba7f3a3f73e6 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 5 Jan 2026 10:52:50 -0800 Subject: [PATCH 2/2] core: PickFirstLB should not return a subchannel during CONNECTING This is a follow-up from noticing a breakage in gRPC-LB because it wasn't checking if the connectivity state changed even if the picker was identical. lb_serverStatusCodeConversion() has been misleading since 42e1829b37 ("xds: Do RLS fallback policy eagar start"). At that point, the subchannel it marked as READY was for the default target's policy, not the policy for wilderness. However, since old PF policy provided a subchannel when CONNECTING, everything was "fine", but RLS would mistakenly count toward target_picks. This demonstrates that RLS target_picks has been broken since it was introduced for PF, as PF relied on the caller to avoid the picker when it was CONNECTING. This may have been hard to notice in production, as the metrics become correct as soon as the connection is established, so as long as you use the channel for a while, the duplicate counting would become a small percentage of the overall amount. --- .../io/grpc/internal/PickFirstLoadBalancer.java | 3 +-- .../grpc/internal/PickFirstLoadBalancerTest.java | 11 +++++------ .../java/io/grpc/rls/RlsLoadBalancerTest.java | 16 +++++++--------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index a41ef03fbda..aa8b5a7e9a9 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -86,8 +86,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState( - CONNECTING, new FixedResultPicker(PickResult.withSubchannel(subchannel))); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 3e0258f2e40..819293e070b 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -219,7 +219,7 @@ public void refreshNameResolutionAfterSubchannelConnectionBroken() { inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); inOrder.verify(mockSubchannel).requestConnection(); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); @@ -278,7 +278,7 @@ public void pickAfterResolvedAndChanged() throws Exception { assertThat(args.getAddresses()).isEqualTo(servers); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); @@ -300,7 +300,7 @@ public void pickAfterStateChangeAfterResolution() throws Exception { verify(mockSubchannel).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel(); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); reset(mockHelper); when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); @@ -317,7 +317,7 @@ public void pickAfterStateChangeAfterResolution() throws Exception { stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care verifyNoMoreInteractions(mockHelper); @@ -405,8 +405,7 @@ public void nameResolutionSuccessAfterError() throws Exception { inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs) - .getSubchannel()); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 8d16d1bd74c..188c99bcd5a 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -72,7 +72,6 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; -import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.internal.testing.StreamRecorder; import io.grpc.lookup.v1.RouteLookupServiceGrpc; @@ -212,12 +211,14 @@ public void lb_serverStatusCodeConversion() throws Exception { throw new RuntimeException(e); } }); + assertThat(subchannels.poll()).isNotNull(); // default target + assertThat(subchannels.poll()).isNull(); + // Warm-up pick; will be queued InOrder inOrder = inOrder(helper); inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); PickSubchannelArgs fakeSearchMethodArgs = newPickSubchannelArgs(fakeSearchMethod); - // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); @@ -230,8 +231,7 @@ public void lb_serverStatusCodeConversion() throws Exception { subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; - verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete"); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // Check on conversion Throwable cause = new Throwable("cause"); @@ -284,8 +284,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { res = picker.pickSubchannel(searchSubchannelArgs); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel()).isSameInstanceAs(searchSubchannel); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; - verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete"); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // rescue should be pending status although the overall channel state is READY res = picker.pickSubchannel(rescueSubchannelArgs); @@ -431,7 +430,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { inOrder.verify(helper).getMetricRecorder(); inOrder.verify(helper).getChannelTarget(); inOrder.verifyNoMoreInteractions(); - int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; + int times = 1; verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1, "defaultTarget", "complete"); @@ -536,8 +535,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; - verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete"); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod)); assertThat(subchannelIsReady(res.getSubchannel())).isTrue();