Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] CSDS getSubscribedResourcesMetadata concurrency fix #18

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -115,8 +118,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -151,6 +156,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_RBAC"))
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_RBAC"));

private static final long METADATA_SYNC_TIMEOUT_SEC = 1;
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager";
Expand Down Expand Up @@ -1958,12 +1964,34 @@ public Collection<String> getSubscribedResources(ServerInfo serverInfo, Resource
}

@Override
Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
Map<String, ResourceMetadata> metadataMap = new HashMap<>();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
metadataMap.put(entry.getKey(), entry.getValue().metadata);
Map<ResourceType, Map<String, ResourceMetadata>> getSubscribedResourcesMetadataSnapshot() {
final SettableFuture<Map<ResourceType, Map<String, ResourceMetadata>>> future =
SettableFuture.create();
syncContext.execute(new Runnable() {
@Override
public void run() {
// A map from ResourceType to a map (ResourceName: ResourceMetadata)
ImmutableMap.Builder<ResourceType, Map<String, ResourceMetadata>> metadataSnapshot =
ImmutableMap.builder();
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
for (Map.Entry<String, ResourceSubscriber> resourceEntry
: getSubscribedResourcesMap(type).entrySet()) {
metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
}
metadataSnapshot.put(type, metadataMap.build());
}
future.set(metadataSnapshot.build());
}
});
try {
return future.get(METADATA_SYNC_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new UncheckedExecutionException(e);
}
return metadataMap;
}

@Override
Expand Down
16 changes: 8 additions & 8 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,15 @@ private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
}
Map<String, ResourceMetadata> metadataMap = xdsClient.getSubscribedResourcesMetadata(type);
for (String resourceName : metadataMap.keySet()) {
ResourceMetadata metadata = metadataMap.get(resourceName);
for (Map.Entry<ResourceType, Map<String, ResourceMetadata>> metadataByTypeEntry
: xdsClient.getSubscribedResourcesMetadataSnapshot().entrySet()) {
ResourceType type = metadataByTypeEntry.getKey();
for (Map.Entry<String, ResourceMetadata> metadataEntry
: metadataByTypeEntry.getValue().entrySet()) {
ResourceMetadata metadata = metadataEntry.getValue();
GenericXdsConfig.Builder genericXdsConfigBuilder = GenericXdsConfig.newBuilder()
.setTypeUrl(type.typeUrl())
.setName(resourceName)
.setName(metadataEntry.getKey())
.setClientStatus(metadataStatusToClientStatus(metadata.getStatus()));
if (metadata.getRawResource() != null) {
genericXdsConfigBuilder
Expand All @@ -161,6 +160,7 @@ static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
.setXdsConfig(metadata.getRawResource());
}
if (metadata.getStatus() == ResourceMetadataStatus.NACKED) {
assert metadata.getErrorState() != null;
genericXdsConfigBuilder
.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
}
Expand Down
7 changes: 6 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,12 @@ TlsContextManager getTlsContextManager() {
throw new UnsupportedOperationException();
}

Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
/**
* Returns the map containing the {@link ResourceMetadata} of the subscribed resources for the
* given resource type, indexed by the resource name.
*/
// Must be synchronized.
Map<ResourceType, Map<String, ResourceMetadata>> getSubscribedResourcesMetadataSnapshot() {
throw new UnsupportedOperationException();
}

Expand Down
12 changes: 7 additions & 5 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,12 @@ protected static boolean matchErrorDetail(

private void verifySubscribedResourcesMetadataSizes(
int ldsSize, int cdsSize, int rdsSize, int edsSize) {
assertThat(xdsClient.getSubscribedResourcesMetadata(LDS)).hasSize(ldsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(CDS)).hasSize(cdsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(RDS)).hasSize(rdsSize);
assertThat(xdsClient.getSubscribedResourcesMetadata(EDS)).hasSize(edsSize);
Map<ResourceType, Map<String, ResourceMetadata>> subscribedResourcesMetadata =
xdsClient.getSubscribedResourcesMetadataSnapshot();
assertThat(subscribedResourcesMetadata.get(LDS)).hasSize(ldsSize);
assertThat(subscribedResourcesMetadata.get(CDS)).hasSize(cdsSize);
assertThat(subscribedResourcesMetadata.get(RDS)).hasSize(rdsSize);
assertThat(subscribedResourcesMetadata.get(EDS)).hasSize(edsSize);
}

/** Verify the resource requested, but not updated. */
Expand Down Expand Up @@ -435,7 +437,7 @@ private ResourceMetadata verifyResourceMetadata(
ResourceType type, String resourceName, Any rawResource, ResourceMetadataStatus status,
String versionInfo, long updateTimeNanos, boolean hasErrorState) {
ResourceMetadata resourceMetadata =
xdsClient.getSubscribedResourcesMetadata(type).get(resourceName);
xdsClient.getSubscribedResourcesMetadataSnapshot().get(type).get(resourceName);
assertThat(resourceMetadata).isNotNull();
String name = type.toString() + " resource '" + resourceName + "' metadata field ";
assertWithMessage(name + "status").that(resourceMetadata.getStatus()).isEqualTo(status);
Expand Down
25 changes: 9 additions & 16 deletions xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Bootstrapper.BootstrapInfo getBootstrapInfo() {
}

@Override
Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
Map<ResourceType, Map<String, ResourceMetadata>> getSubscribedResourcesMetadataSnapshot() {
return ImmutableMap.of();
}
};
Expand Down Expand Up @@ -130,7 +130,7 @@ public void fetchClientConfig_invalidArgument() {
public void fetchClientConfig_unexpectedException() {
XdsClient throwingXdsClient = new XdsClient() {
@Override
Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
Map<ResourceType, Map<String, ResourceMetadata>> getSubscribedResourcesMetadataSnapshot() {
throw new IllegalArgumentException("IllegalArgumentException");
}
};
Expand Down Expand Up @@ -302,20 +302,13 @@ Bootstrapper.BootstrapInfo getBootstrapInfo() {
}

@Override
Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
switch (type) {
case LDS:
return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_LDS);
case RDS:
return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_RDS);
case CDS:
return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_CDS);
case EDS:
return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_EDS);
case UNKNOWN:
default:
throw new AssertionError("Unexpected resource name");
}
Map<ResourceType, Map<String, ResourceMetadata>> getSubscribedResourcesMetadataSnapshot() {
return new ImmutableMap.Builder<ResourceType, Map<String, ResourceMetadata>>()
.put(LDS, ImmutableMap.of("subscribedResourceName.LDS", METADATA_ACKED_LDS))
.put(RDS, ImmutableMap.of("subscribedResourceName.RDS", METADATA_ACKED_RDS))
.put(CDS, ImmutableMap.of("subscribedResourceName.CDS", METADATA_ACKED_CDS))
.put(EDS, ImmutableMap.of("subscribedResourceName.EDS", METADATA_ACKED_EDS))
.build();
}
});

Expand Down