Skip to content

Commit

Permalink
grpc-js-xds: Allow tests to set bootstrap info in channel args
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Mar 10, 2023
1 parent 6bc6b86 commit e32bbc7
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 21 deletions.
8 changes: 5 additions & 3 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export class CdsLoadBalancer implements LoadBalancer {

private latestConfig: CdsLoadBalancingConfig | null = null;
private latestAttributes: { [key: string]: unknown } = {};
private xdsClient: XdsClient | null = null;

constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
Expand Down Expand Up @@ -188,6 +189,7 @@ export class CdsLoadBalancer implements LoadBalancer {
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient as XdsClient;

/* If the cluster is changing, disable the old watcher before adding the new
* one */
Expand All @@ -196,7 +198,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.latestConfig?.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
getSingletonXdsClient().removeClusterWatcher(
this.xdsClient.removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
Expand All @@ -212,7 +214,7 @@ export class CdsLoadBalancer implements LoadBalancer {

if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
}
Expand All @@ -226,7 +228,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
getSingletonXdsClient().removeClusterWatcher(
this.xdsClient?.removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
Expand Down
10 changes: 6 additions & 4 deletions packages/grpc-js-xds/src/load-balancer-eds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export class EdsLoadBalancer implements LoadBalancer {

private lastestConfig: EdsLoadBalancingConfig | null = null;
private latestAttributes: { [key: string]: unknown } = {};
private xdsClient: XdsClient | null = null;
private latestEdsUpdate: ClusterLoadAssignment__Output | null = null;

/**
Expand Down Expand Up @@ -488,13 +489,14 @@ export class EdsLoadBalancer implements LoadBalancer {
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.lastestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient as XdsClient;
const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster();

/* If the name is changing, disable the old watcher before adding the new
* one */
if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) {
trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName)
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher);
this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
this.isWatcherActive = false;
Expand All @@ -507,12 +509,12 @@ export class EdsLoadBalancer implements LoadBalancer {

if (!this.isWatcherActive) {
trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName);
getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher);
this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher);
this.isWatcherActive = true;
}

if (lbConfig.getLrsLoadReportingServerName()) {
this.clusterDropStats = getSingletonXdsClient().addClusterDropStats(
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.getLrsLoadReportingServerName()!,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
Expand All @@ -533,7 +535,7 @@ export class EdsLoadBalancer implements LoadBalancer {
destroy(): void {
trace('Destroying load balancer with edsServiceName ' + this.edsServiceName);
if (this.edsServiceName) {
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher);
this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher);
}
this.childBalancer.destroy();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/load-balancer-lrs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class LrsLoadBalancer implements LoadBalancer {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
return;
}
this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats(
this.localityStatsReporter = (attributes.xdsClient as XdsClient).addClusterLocalityStats(
lbConfig.getLrsLoadReportingServerName(),
lbConfig.getClusterName(),
lbConfig.getEdsServiceName(),
Expand Down
26 changes: 19 additions & 7 deletions packages/grpc-js-xds/src/resolver-xds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment'
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import RetryPolicy = experimental.RetryPolicy;
import { validateBootstrapConfig } from './xds-bootstrap';

const TRACER_NAME = 'xds_resolver';

Expand Down Expand Up @@ -210,6 +211,8 @@ function getDefaultRetryMaxInterval(baseInterval: string): string {
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
}

const BOOTSTRAP_CONFIG_KEY = 'grpc.TEST_ONLY_DO_NOT_USE_IN_PROD.xds_bootstrap_config';

const RETRY_CODES: {[key: string]: status} = {
'cancelled': status.CANCELLED,
'deadline-exceeded': status.DEADLINE_EXCEEDED,
Expand Down Expand Up @@ -238,11 +241,20 @@ class XdsResolver implements Resolver {

private ldsHttpFilterConfigs: {name: string, config: HttpFilterConfig}[] = [];

private xdsClient: XdsClient;

constructor(
private target: GrpcUri,
private listener: ResolverListener,
private channelOptions: ChannelOptions
) {
if (channelOptions[BOOTSTRAP_CONFIG_KEY]) {
const parsedConfig = JSON.parse(channelOptions[BOOTSTRAP_CONFIG_KEY]);
const validatedConfig = validateBootstrapConfig(parsedConfig);
this.xdsClient = new XdsClient(validatedConfig);
} else {
this.xdsClient = getSingletonXdsClient();
}
this.ldsWatcher = {
onValidUpdate: (update: Listener__Output) => {
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, update.api_listener!.api_listener!.value);
Expand All @@ -267,16 +279,16 @@ class XdsResolver implements Resolver {
const routeConfigName = httpConnectionManager.rds!.route_config_name;
if (this.latestRouteConfigName !== routeConfigName) {
if (this.latestRouteConfigName !== null) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
getSingletonXdsClient().addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher);
this.xdsClient.addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher);
this.latestRouteConfigName = routeConfigName;
}
break;
}
case 'route_config':
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
this.handleRouteConfig(httpConnectionManager.route_config!);
break;
Expand Down Expand Up @@ -546,7 +558,7 @@ class XdsResolver implements Resolver {
methodConfig: [],
loadBalancingConfig: [lbPolicyConfig]
}
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {});
this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {xdsClient: this.xdsClient});
}

private reportResolutionError(reason: string) {
Expand All @@ -563,15 +575,15 @@ class XdsResolver implements Resolver {
// Wait until updateResolution is called once to start the xDS requests
if (!this.isLdsWatcherActive) {
trace('Starting resolution for target ' + uriToString(this.target));
getSingletonXdsClient().addListenerWatcher(this.target.path, this.ldsWatcher);
this.xdsClient.addListenerWatcher(this.target.path, this.ldsWatcher);
this.isLdsWatcherActive = true;
}
}

destroy() {
getSingletonXdsClient().removeListenerWatcher(this.target.path, this.ldsWatcher);
this.xdsClient.removeListenerWatcher(this.target.path, this.ldsWatcher);
if (this.latestRouteConfigName) {
getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
this.xdsClient.removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher);
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/grpc-js-xds/src/xds-bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ function validateNode(obj: any): Node {
return result;
}

function validateBootstrapFile(obj: any): BootstrapInfo {
export function validateBootstrapConfig(obj: any): BootstrapInfo {
return {
xdsServers: obj.xds_servers.map(validateXdsServerConfig),
node: validateNode(obj.node),
Expand Down Expand Up @@ -265,7 +265,7 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
}
try {
const parsedFile = JSON.parse(data);
resolve(validateBootstrapFile(parsedFile));
resolve(validateBootstrapConfig(parsedFile));
} catch (e) {
reject(
new Error(
Expand All @@ -290,7 +290,7 @@ export async function loadBootstrapInfo(): Promise<BootstrapInfo> {
if (bootstrapConfig) {
try {
const parsedConfig = JSON.parse(bootstrapConfig);
const loadedBootstrapInfoValue = validateBootstrapFile(parsedConfig);
const loadedBootstrapInfoValue = validateBootstrapConfig(parsedConfig);
loadedBootstrapInfo = Promise.resolve(loadedBootstrapInfoValue);
} catch (e) {
throw new Error(
Expand Down
14 changes: 11 additions & 3 deletions packages/grpc-js-xds/src/xds-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import { loadBootstrapInfo } from './xds-bootstrap';
import { BootstrapInfo, loadBootstrapInfo } from './xds-bootstrap';
import { Node } from './generated/envoy/config/core/v3/Node';
import { AggregatedDiscoveryServiceClient } from './generated/envoy/service/discovery/v3/AggregatedDiscoveryService';
import { DiscoveryRequest } from './generated/envoy/service/discovery/v3/DiscoveryRequest';
Expand Down Expand Up @@ -276,7 +276,7 @@ export class XdsClient {
private adsBackoff: BackoffTimeout;
private lrsBackoff: BackoffTimeout;

constructor() {
constructor(bootstrapInfoOverride?: BootstrapInfo) {
const edsState = new EdsState(() => {
this.updateNames('eds');
});
Expand Down Expand Up @@ -310,7 +310,15 @@ export class XdsClient {
});
this.lrsBackoff.unref();

Promise.all([loadBootstrapInfo(), loadAdsProtos()]).then(
async function getBootstrapInfo(): Promise<BootstrapInfo> {
if (bootstrapInfoOverride) {
return bootstrapInfoOverride;
} else {
return loadBootstrapInfo();
}
}

Promise.all([getBootstrapInfo(), loadAdsProtos()]).then(
([bootstrapInfo, protoDefinitions]) => {
if (this.hasShutdown) {
return;
Expand Down
8 changes: 8 additions & 0 deletions packages/grpc-js-xds/src/xds-stream-state/eds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,33 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
const priorityTotalWeights: Map<number, number> = new Map();
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
trace('EDS validation: endpoint locality unset');
return false;
}
for (const {locality, priority} of seenLocalities) {
if (localitiesEqual(endpoint.locality, locality) && endpoint.priority === priority) {
trace('EDS validation: endpoint locality duplicated: ' + JSON.stringify(locality) + ', priority=' + priority);
return false;
}
}
seenLocalities.push({locality: endpoint.locality, priority: endpoint.priority});
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
trace('EDS validation: endpoint socket_address not set');
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
trace('EDS validation: socket_address.port_specifier !== "port_value"');
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
trace('EDS validation: address not a valid IPv4 or IPv6 address: ' + socketAddress.address);
return false;
}
for (const address of seenAddresses) {
if (addressesEqual(socketAddress, address)) {
trace('EDS validation: duplicate address seen: ' + address);
return false;
}
}
Expand All @@ -91,11 +97,13 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
}
for (const totalWeight of priorityTotalWeights.values()) {
if (totalWeight > UINT32_MAX) {
trace('EDS validation: total weight > UINT32_MAX')
return false;
}
}
for (const priority of priorityTotalWeights.keys()) {
if (priority > 0 && !priorityTotalWeights.has(priority - 1)) {
trace('EDS validation: priorities not contiguous');
return false;
}
}
Expand Down

0 comments on commit e32bbc7

Please sign in to comment.