Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce addTransportFilter to NettyChannel #3892

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions core/src/main/java/io/grpc/ClientTransportFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

/**
* Listens on client transport life-cycle events, with the capability to read and/or change
* transport attributes. Attributes returned by this filter will be merged into {@link
* ClientCall#getAttributes}.
*
* <p>Multiple filters maybe registered to a channel, in which case the output of a filter is the
* input of the next filter. For example, what returned by {@link #transportReady} of a filter is
* passed to the same method of the next filter, and the last filter's return value is the effective
* transport attributes.
*
* <p>{@link Grpc} defines commonly used attributes.
*
* @since 1.10.0
*/
public abstract class ClientTransportFilter {
/**
* Called when a transport is ready to process streams. All necessary handshakes, e.g., TLS
* handshake, are done at this point.
*
* <p>Note the implementation should always inherit the passed-in attributes using {@code
* Attributes.newBuilder(transportAttrs)}, instead of creating one from scratch.
*
* @param transportAttrs current transport attributes
*
* @return new transport attributes. Default implementation returns the passed-in attributes
* intact.
*/
public Attributes transportReady(Attributes transportAttrs) {
return transportAttrs;
}

/**
* Called when a transport is terminated. Default implementation is no-op.
*
* @param transportAttrs the effective transport attributes, which is what returned by {@link
* #transportReady} of the last executed filter.
*/
public void transportTerminated(Attributes transportAttrs) {
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/ForwardingChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public T intercept(ClientInterceptor... interceptors) {
return thisT();
}

@Override
public T addTransportFilter(ClientTransportFilter filter) {
delegate().addTransportFilter(filter);
return thisT();
}

@Override
public T userAgent(String userAgent) {
delegate().userAgent(userAgent);
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/io/grpc/ManagedChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
*/
public abstract T intercept(ClientInterceptor... interceptors);

/**
* Adds a {@link ClientTransportFilter}. The order of filters being added is the order they will
* be executed.
*
* @return this
* @since 1.10.0
*/
public abstract T addTransportFilter(ClientTransportFilter filter);

/**
* Provides a custom {@code User-Agent} for the application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.ClientTransportFilter;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
Expand Down Expand Up @@ -98,6 +99,9 @@ public static ManagedChannelBuilder<?> forTarget(String target) {

private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();

protected final List<ClientTransportFilter> transportFilters =
new ArrayList<ClientTransportFilter>();

// Access via getter, which may perform authority override as needed
private NameResolver.Factory nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY;

Expand Down Expand Up @@ -205,6 +209,12 @@ public final T intercept(ClientInterceptor... interceptors) {
return intercept(Arrays.asList(interceptors));
}

@Override
public final T addTransportFilter(ClientTransportFilter filter) {
this.transportFilters.add(filter);
return thisT();
}

@Override
public final T nameResolverFactory(NameResolver.Factory resolverFactory) {
Preconditions.checkState(directServerAddress == null,
Expand Down Expand Up @@ -345,6 +355,7 @@ public ManagedChannel build() {
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors(),
this.transportFilters,
GrpcUtil.getProxyDetector(),
ChannelTracer.getDefaultFactory());
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientTransportFilter;
import io.grpc.CompressorRegistry;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand Down Expand Up @@ -136,6 +137,7 @@ public final class ManagedChannelImpl extends ManagedChannel {
* any interceptors this will just be {@link RealChannel}.
*/
private final Channel interceptorChannel;
private final List<ClientTransportFilter> transportFilters;
@Nullable private final String userAgent;

// Only null after channel is terminated. Must be assigned from the channelExecutor.
Expand Down Expand Up @@ -446,6 +448,7 @@ ClientStream newStream() {
ObjectPool<? extends Executor> oobExecutorPool,
Supplier<Stopwatch> stopwatchSupplier,
List<ClientInterceptor> interceptors,
List<ClientTransportFilter> transportFilters,
ProxyDetector proxyDetector,
ChannelTracer.Factory channelTracerFactory) {
this.target = checkNotNull(builder.target, "target");
Expand All @@ -463,6 +466,7 @@ ClientStream newStream() {
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
this.transportFilters = checkNotNull(transportFilters, "transportFilters");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientTransportFilter;
import io.grpc.EquivalentAddressGroup;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -142,7 +143,7 @@ class Builder extends AbstractManagedChannelImplBuilder<Builder> {
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(),
Collections.<ClientInterceptor>emptyList(),
Collections.<ClientInterceptor>emptyList(), Collections.<ClientTransportFilter>emptyList(),
GrpcUtil.NOOP_PROXY_DETECTOR, ChannelTracer.getDefaultFactory());
newTransports = TestUtils.captureTransports(mockTransportFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientTransportFilter;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
Expand Down Expand Up @@ -229,7 +230,8 @@ class Builder extends AbstractManagedChannelImplBuilder<Builder> {
checkState(channel == null);
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(), interceptors, GrpcUtil.NOOP_PROXY_DETECTOR,
oobExecutorPool, timer.getStopwatchSupplier(), interceptors,
Collections.<ClientTransportFilter>emptyList(), GrpcUtil.NOOP_PROXY_DETECTOR,
channelStatsFactory);

if (requestConnection) {
Expand Down
10 changes: 7 additions & 3 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.Attributes;
import io.grpc.ClientTransportFilter;
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
import io.grpc.NameResolver;
Expand All @@ -47,6 +48,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -322,7 +324,7 @@ protected ClientTransportFactory buildTransportFactory() {
return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory.create());
transportFilters, transportTracerFactory.create());
}

@Override
Expand Down Expand Up @@ -458,6 +460,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto
private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
private final List<ClientTransportFilter> transportFilters;
private final TransportTracer transportTracer;

private boolean closed;
Expand All @@ -467,10 +470,11 @@ private static final class NettyTransportFactory implements ClientTransportFacto
NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer transportTracer) {
List<ClientTransportFilter> transportFilters, TransportTracer transportTracer) {
this.channelType = channelType;
this.negotiationType = negotiationType;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.transportFilters = transportFilters;
this.transportTracer = transportTracer;

if (transportCreationParamsFilterFactory == null) {
Expand Down Expand Up @@ -515,7 +519,7 @@ public void run() {
dparams.getProtocolNegotiator(), flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
tooManyPingsRunnable, transportTracer);
tooManyPingsRunnable, transportFilters, transportTracer);
return transport;
}

Expand Down
20 changes: 20 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.Attributes;
import io.grpc.ClientTransportFilter;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -69,6 +70,7 @@
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.handler.logging.LogLevel;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -99,6 +101,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private final KeepAliveManager keepAliveManager;
// Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory;
private final List<ClientTransportFilter> transportFilters;
private final TransportTracer transportTracer;
private WriteQueue clientWriteQueue;
private Http2Ping ping;
Expand All @@ -111,6 +114,7 @@ static NettyClientHandler newHandler(
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable,
List<ClientTransportFilter> transportFilters,
TransportTracer transportTracer) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Expand All @@ -133,6 +137,7 @@ static NettyClientHandler newHandler(
maxHeaderListSize,
stopwatchFactory,
tooManyPingsRunnable,
transportFilters,
transportTracer);
}

Expand All @@ -147,6 +152,7 @@ static NettyClientHandler newHandler(
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable,
List<ClientTransportFilter> transportFilters,
TransportTracer transportTracer) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Expand Down Expand Up @@ -196,6 +202,7 @@ public TransportTracer.FlowControlWindows read() {
keepAliveManager,
stopwatchFactory,
tooManyPingsRunnable,
transportFilters,
transportTracer);
}

Expand All @@ -207,11 +214,13 @@ private NettyClientHandler(
KeepAliveManager keepAliveManager,
Supplier<Stopwatch> stopwatchFactory,
final Runnable tooManyPingsRunnable,
List<ClientTransportFilter> transportFilters,
TransportTracer transportTracer) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
this.transportFilters = Preconditions.checkNotNull(transportFilters);
this.transportTracer = Preconditions.checkNotNull(transportTracer);

// Set the frame listener on the decoder.
Expand Down Expand Up @@ -370,6 +379,10 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
for (ClientTransportFilter filter : transportFilters) {
filter.transportTerminated(attributes);
}

try {
logger.fine("Network channel is closed");
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
Expand Down Expand Up @@ -696,6 +709,13 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
if (firstSettings) {
firstSettings = false;
lifecycleManager.notifyReady();

Attributes temporary = attributes;
for (ClientTransportFilter filter : transportFilters) {
temporary = Preconditions.checkNotNull(filter.transportReady(temporary),
"Filter %s returned null", filter);
}
attributes = temporary;
}
}

Expand Down
8 changes: 7 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientTransportFilter;
import io.grpc.InternalLogId;
import io.grpc.InternalTransportStats;
import io.grpc.Metadata;
Expand All @@ -51,6 +52,7 @@
import io.netty.util.AsciiString;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
Expand Down Expand Up @@ -87,6 +89,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private Status statusExplainingWhyTheChannelIsNull;
/** Since not thread-safe, may only be used from event loop. */
private ClientTransportLifecycleManager lifecycleManager;
private List<ClientTransportFilter> transportFilters;
/** Since not thread-safe, may only be used from event loop. */
private final TransportTracer transportTracer;

Expand All @@ -96,7 +99,8 @@ class NettyClientTransport implements ConnectionClientTransport {
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer) {
Runnable tooManyPingsRunnable, List<ClientTransportFilter> transportFilters,
TransportTracer transportTracer) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
Expand All @@ -112,6 +116,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportFilters = Preconditions.checkNotNull(transportFilters, "transportFilters");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
}

Expand Down Expand Up @@ -192,6 +197,7 @@ public Runnable start(Listener transportListener) {
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
tooManyPingsRunnable,
transportFilters,
transportTracer);
NettyHandlerSettings.setAutoWindow(handler);

Expand Down
Loading