Skip to content

Commit

Permalink
xds: add xdsTransportFactory interface (#10827)
Browse files Browse the repository at this point in the history
This interface will be used to replace XdsChannelTransport.
  • Loading branch information
YifeiZhuang authored Jan 17, 2024
1 parent a151099 commit 005f8c0
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 0 deletions.
135 changes: 135 additions & 0 deletions xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;

import io.grpc.CallOptions;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.TimeUnit;

final class GrpcXdsTransportFactory implements XdsTransportFactory {

static final XdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = new GrpcXdsTransportFactory();

@Override
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
return new GrpcXdsTransport(serverInfo);
}

private class GrpcXdsTransport implements XdsTransport {

private final ManagedChannel channel;

public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
this.channel = Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
}

@Override
public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
String fullMethodName,
MethodDescriptor.Marshaller<ReqT> reqMarshaller,
MethodDescriptor.Marshaller<RespT> respMarshaller) {
return new XdsStreamingCall<>(fullMethodName, reqMarshaller, respMarshaller);
}

@Override
public void shutdown() {
channel.shutdown();
}

private class XdsStreamingCall<ReqT, RespT> implements
XdsTransportFactory.StreamingCall<ReqT, RespT> {

private final ClientCall<ReqT, RespT> call;

public XdsStreamingCall(String methodName, MethodDescriptor.Marshaller<ReqT> reqMarshaller,
MethodDescriptor.Marshaller<RespT> respMarshaller) {
this.call = channel.newCall(
MethodDescriptor.<ReqT, RespT>newBuilder()
.setFullMethodName(methodName)
.setType(MethodDescriptor.MethodType.BIDI_STREAMING)
.setRequestMarshaller(reqMarshaller)
.setResponseMarshaller(respMarshaller)
.build(),
CallOptions.DEFAULT); // TODO(zivy): support waitForReady
}

@Override
public void start(EventHandler<RespT> eventHandler) {
call.start(new EventHandlerToCallListenerAdapter<>(eventHandler), new Metadata());
call.request(1);
}

@Override
public void sendMessage(ReqT message) {
call.sendMessage(message);
}

@Override
public void startRecvMessage() {
call.request(1);
}

@Override
public void sendError(Exception e) {
call.cancel("Cancelled by XdsClientImpl", e);
}

@Override
public boolean isReady() {
return call.isReady();
}
}
}

private static class EventHandlerToCallListenerAdapter<T> extends ClientCall.Listener<T> {
private final EventHandler<T> handler;

EventHandlerToCallListenerAdapter(EventHandler<T> eventHandler) {
this.handler = checkNotNull(eventHandler, "eventHandler");
}

@Override
public void onHeaders(Metadata headers) {}

@Override
public void onMessage(T message) {
handler.onRecvMessage(message);
}

@Override
public void onClose(Status status, Metadata trailers) {
handler.onStatusReceived(status);
}

@Override
public void onReady() {
handler.onReady();
}
}
}
93 changes: 93 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsTransportFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import io.grpc.ExperimentalApi;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

/**
* A factory for creating new XdsTransport instances.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10823")
public interface XdsTransportFactory {
XdsTransport create(Bootstrapper.ServerInfo serverInfo);

/**
* Represents transport for xDS communication (e.g., a gRPC channel).
*/
interface XdsTransport {
<ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
String fullMethodName, MethodDescriptor.Marshaller<ReqT> reqMarshaller,
MethodDescriptor.Marshaller<RespT> respMarshaller);

void shutdown();
}

/**
* Represents a bidi streaming RPC call.
*/
interface StreamingCall<ReqT, RespT> {
void start(EventHandler<RespT> eventHandler);

/**
* Sends a message on the stream.
* Only one message will be in flight at a time; subsequent
* messages will not be sent until this one is done.
*/
void sendMessage(ReqT message);

/**
* Requests a message to be received.
*/
void startRecvMessage();

/**
* An error is encountered. Sends the error.
*/
void sendError(Exception e);

/** Indicates whether call is capable of sending additional messages without requiring
* excessive buffering internally. Used for resource initial fetch timeout notification. See
* also {@link EventHandler#onReady()}. Application is free to ignore it.
*/
boolean isReady();
}

/**
* An interface for handling events on a streaming call.
**/
interface EventHandler<RespT> {

/**
* Called when the stream is ready to send additional messages. If called the library use this
* handler to trigger resource arrival timeout, also see {@link StreamingCall#isReady()}.
* Application is free to ignore it.
*/
void onReady();

/**
* Called when a message is received on the stream.
*/
void onRecvMessage(RespT message);

/**
* Called when status is received on the stream.
*/
void onStatusReceived(Status status);
}
}
140 changes: 140 additions & 0 deletions xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.truth.Truth.assertThat;

import com.google.common.util.concurrent.SettableFuture;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class GrpcXdsTransportFactoryTest {

private Server server;

@Before
public void setup() throws Exception {
server = Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create())
.addService(echoAdsService())
.build()
.start();
}

@After
public void tearDown() {
server.shutdown();
}

private BindableService echoAdsService() {
return new AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
StreamObserver<DiscoveryRequest> requestObserver = new StreamObserver<DiscoveryRequest>() {
@Override
public void onNext(DiscoveryRequest value) {
responseObserver.onNext(DiscoveryResponse.newBuilder()
.setVersionInfo(value.getVersionInfo())
.setNonce(value.getResponseNonce())
.build());
}

@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};

return requestObserver;
}
};
}

@Test
public void callApis() throws Exception {
XdsTransportFactory.XdsTransport xdsTransport =
GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY.create(
Bootstrapper.ServerInfo.create("localhost:" + server.getPort(),
InsecureChannelCredentials.create()));
MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
XdsTransportFactory.StreamingCall<DiscoveryRequest, DiscoveryResponse> streamingCall =
xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
FakeEventHandler fakeEventHandler = new FakeEventHandler();
streamingCall.start(fakeEventHandler);
streamingCall.sendMessage(
DiscoveryRequest.newBuilder().setVersionInfo("v1").setResponseNonce("2024").build());
DiscoveryResponse response = fakeEventHandler.respQ.poll(5000, TimeUnit.MILLISECONDS);
assertThat(response.getVersionInfo()).isEqualTo("v1");
assertThat(response.getNonce()).isEqualTo("2024");
assertThat(fakeEventHandler.ready.get(5000, TimeUnit.MILLISECONDS)).isTrue();
Exception expectedException = new IllegalStateException("Test cancel stream.");
streamingCall.sendError(expectedException);
Status realStatus = fakeEventHandler.endFuture.get(5000, TimeUnit.MILLISECONDS);
assertThat(realStatus.getDescription()).isEqualTo("Cancelled by XdsClientImpl");
assertThat(realStatus.getCode()).isEqualTo(Status.CANCELLED.getCode());
assertThat(realStatus.getCause()).isEqualTo(expectedException);
xdsTransport.shutdown();
}

private static class FakeEventHandler implements
XdsTransportFactory.EventHandler<DiscoveryResponse> {
private final BlockingQueue<DiscoveryResponse> respQ = new LinkedBlockingQueue<>();
private SettableFuture<Status> endFuture = SettableFuture.create();
private SettableFuture<Boolean> ready = SettableFuture.create();

@Override
public void onReady() {
ready.set(true);
}

@Override
public void onRecvMessage(DiscoveryResponse message) {
respQ.offer(message);
}

@Override
public void onStatusReceived(Status status) {
endFuture.set(status);
}
}
}

0 comments on commit 005f8c0

Please sign in to comment.