8000 xds: Fix load reporting when pick first is used for locality-routing. by DNVindhya · Pull Request #11495 · grpc/grpc-java · GitHub
[go: up one dir, main page]

Skip to content

xds: Fix load reporting when pick first is used for locality-routing. #11495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

/**
* An internal class. Do not use.
*
* <p>An interface to provide the attributes for address connected by subchannel.
*/
@Internal
public interface InternalSubchannelAddressAttributes {

/**
* Return attributes of the server address connected by sub channel.
*/
public Attributes getConnectedAddressAttributes();
}
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,18 @@
public Object getInternalSubchannel() {
throw new UnsupportedOperationException();
}

/**
* (Internal use only) returns attributes of the address subchannel is connected to.
*
* <p>Warning: this is INTERNAL API, is not supposed to be used by external users, and may
* change without notice. If you think you must use it, please file an issue and we can consider
* removing its "internal" status.
*/
@Internal
public Attributes getConnectedAddressAttributes() {
throw new UnsupportedOperationException();

Check warning on line 1441 in api/src/main/java/io/grpc/LoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/LoadBalancer.java#L1441

Added line #L1441 was not covered by tests
}
}

/**
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ protected void handleNotInUse() {

private Status shutdownReason;

private volatile Attributes connectedAddressAttributes;

InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Expand Down Expand Up @@ -525,6 +527,13 @@ public void run() {
return channelStatsFuture;
}

/**
* Return attributes for server address connected by sub channel.
*/
public Attributes getConnectedAddressAttributes() {
return connectedAddressAttributes;
}

ConnectivityState getState() {
return state.getState();
}
Expand Down Expand Up @@ -568,6 +577,7 @@ public void run() {
} else if (pendingTransport == transport) {
activeTransport = transport;
pendingTransport = null;
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
gotoNonErrorState(READY);
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,11 @@ public void updateAddresses(List<EquivalentAddressGroup> addrs) {
subchannel.updateAddresses(addrs);
}

@Override
public Attributes getConnectedAddressAttributes() {
return subchannel.getConnectedAddressAttributes();
}

private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
List<EquivalentAddressGroup> eags) {
List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,32 @@ public void channelzStatContainsTransport() throws Exception {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
}

@Test
public void connectedAddressAttributes_ready() {
SocketAddress addr = new SocketAddress() {};
Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build();
createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr));

assertEquals(IDLE, internalSubchannel.getState());
assertNoCallbackInvoke();
assertNull(internalSubchannel.obtainActiveTransport());
assertNull(internalSubchannel.getConnectedAddressAttributes());

assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory).newClientTransport(
eq(addr),
eq(createClientTransportOptions().setEagAttributes(attr)),
isA(TransportLogger.class));
assertNull(internalSubchannel.getConnectedAddressAttributes());

internalSubchannel.obtainActiveTransport();
transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY");
assertEquals(READY, internalSubchannel.getState());
assertEquals(attr, internalSubchannel.getConnectedAddressAttributes());
}

/** Create ClientTransportOptions. Should not be reused if it may be mutated. */
private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() {
return new ClientTransportFactory.ClientTransportOptions()
Expand Down
6 changes: 6 additions & 0 deletions util/src/main/java/io/grpc/util/ForwardingSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ public Object getInternalSubchannel() {
return delegate().getInternalSubchannel();
}


@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
delegate().updateAddresses(addrs);
}

@Override
public Attributes getConnectedAddressAttributes() {
return delegate().getConnectedAddressAttributes();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Expand Down
141 changes: 103 additions & 38 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/**
Expand All @@ -77,10 +79,8 @@
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));

private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
private static final Attributes.Key<String> ATTR_CLUSTER_LOCALITY_NAME =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName");
private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");

private final XdsLogger logger;
private final Helper helper;
Expand Down Expand Up @@ -213,36 +213,45 @@
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
Locality locality = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality
String localityName = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY_NAME);
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
// In case of not (which really shouldn't), loads are aggregated under an empty locality.
if (locality == null) {
locality = Locality.create("", "", "");
localityName = "";
}
final ClusterLocalityStats localityStats =
(lrsServerInfo == null)
? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);

// This value for ClusterLocality is not recommended for general use.
// Currently, we extract locality data from the first address, even before the subchannel is
// READY.
// This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
// might return the subchannel before it is READY. Typically, we wouldn't report load for such
// selections because the channel will disregard the chosen (not-ready) subchannel.
// However, we needed to ensure this case is handled.
ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
args.getAddresses().get(0).getAttributes());
AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
clusterLocality);
Attributes attrs = args.getAttributes().toBuilder()
.set(ATTR_CLUSTER_LOCALITY_STATS, localityStats)
.set(ATTR_CLUSTER_LOCALITY_NAME, localityName)
.set(ATTR_CLUSTER_LOCALITY, localityAtomicReference)
.build();
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
final Subchannel subchannel = delegate().createSubchannel(args);

return new ForwardingSubchannel() {
@Override
public void start(SubchannelStateListener listener) {
delegate().start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState().equals(ConnectivityState.READY)) {
// Get locality based on the connected address attributes
ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
10000 subchannel.getConnectedAddressAttributes());
ClusterLocality oldClusterLocality = localityAtomicReference
.getAndSet(updatedClusterLocality);
oldClusterLocality.release();
}
listener.onSubchannelState(newState);
}
});
}

@Override
public void shutdown() {
if (localityStats != null) {
localityStats.release();
}
localityAtomicReference.get().release();
delegate().shutdown();
}

Expand Down Expand Up @@ -274,6 +283,28 @@
return newAddresses;
}

private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
Locality locality = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY);
String localityName = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY_NAME);

// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
// In case of not (which really shouldn't), loads are aggregated under an empty
// locality.
if (locality == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be, but is possible that locality is set but locality name is null. You should probably have an else clause that does a null check on localityName.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing code, and it seems in general code assumes the values to be non-null. I'd much rather we handle that centrally in io.grpc.xds.client.Locality.create() than lots of random not-possible-to-trigger checks. The data is primarily coming from a proto, so it will be "" when unset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to move the locality name null check to io.grpc.xds.client.Locality.create()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see now Larry's comment wasn't about the Locality struct, but instead the attribute. That had been discussed when the code was originally introduced:
#11133 (comment)

This PR is likely to be reverted (in some way) later, once the old PF policy goes away. So changes to the existing code are likely to be lost.

locality = Locality.create("", "", "");
localityName = "";

Check warning on line 296 in xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java#L295-L296

Added lines #L295 - L296 were not covered by tests
}

final ClusterLocalityStats localityStats =
(lrsServerInfo == null)
? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);

return new ClusterLocality(localityStats, localityName);
}

@Override
protected Helper delegate() {
return helper;
Expand Down Expand Up @@ -361,18 +392,23 @@
"Cluster max concurrent requests limit exceeded"));
}
}
final ClusterLocalityStats stats =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
if (stats != null) {
String localityName =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME);
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);

ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
stats, inFlights, result.getStreamTracerFactory());
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
final AtomicReference<ClusterLocality> clusterLocality =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);

if (clusterLocality != null) {
ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
if (stats != null) {
String localityName =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
.getClusterLocalityName();
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);

ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
stats, inFlights, result.getStreamTracerFactory());
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
}
}
}
return result;
Expand Down Expand Up @@ -447,4 +483,33 @@
stats.recordBackendLoadMetricStats(report.getNamedMetrics());
}
}

/**
* Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
*/
static final class ClusterLocality {
private final ClusterLocalityStats clusterLocalityStats;
private final String clusterLocalityName;

@VisibleForTesting
ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
this.clusterLocalityStats = localityStats;
this.clusterLocalityName = localityName;
}

ClusterLocalityStats getClusterLocalityStats() {
return clusterLocalityStats;
}

String getClusterLocalityName() {
return clusterLocalityName;
}

@VisibleForTesting
void release() {
if (clusterLocalityStats != null) {
clusterLocalityStats.release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private synchronized void releaseClusterDropCounter(
String cluster, @Nullable String edsServiceName) {
checkState(allDropStats.containsKey(cluster)
&& allDropStats.get(cluster).containsKey(edsServiceName),
"stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName);
"stats for cluster %s, edsServiceName %s do not exist", cluster, edsServiceName);
ReferenceCounted<ClusterDropStats> ref = allDropStats.get(cluster).get(edsServiceName);
ref.release();
}
Expand Down
Loading
0