diff --git a/core/src/main/java/io/grpc/ClientTransportFilter.java b/core/src/main/java/io/grpc/ClientTransportFilter.java
new file mode 100644
index 00000000000..98ce983490d
--- /dev/null
+++ b/core/src/main/java/io/grpc/ClientTransportFilter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+/**
+ * Listens on client transport life-cycle events, with the capability to read and/or change
+ * transport attributes. Attributes returned by this filter will be merged into {@link
+ * ClientCall#getAttributes}.
+ *
+ *
Multiple filters maybe registered to a channel, in which case the output of a filter is the
+ * input of the next filter. For example, what returned by {@link #transportReady} of a filter is
+ * passed to the same method of the next filter, and the last filter's return value is the effective
+ * transport attributes.
+ *
+ *
{@link Grpc} defines commonly used attributes.
+ *
+ * @since 1.10.0
+ */
+public abstract class ClientTransportFilter {
+ /**
+ * Called when a transport is ready to process streams. All necessary handshakes, e.g., TLS
+ * handshake, are done at this point.
+ *
+ *
Note the implementation should always inherit the passed-in attributes using {@code
+ * Attributes.newBuilder(transportAttrs)}, instead of creating one from scratch.
+ *
+ * @param transportAttrs current transport attributes
+ *
+ * @return new transport attributes. Default implementation returns the passed-in attributes
+ * intact.
+ */
+ public Attributes transportReady(Attributes transportAttrs) {
+ return transportAttrs;
+ }
+
+ /**
+ * Called when a transport is terminated. Default implementation is no-op.
+ *
+ * @param transportAttrs the effective transport attributes, which is what returned by {@link
+ * #transportReady} of the last executed filter.
+ */
+ public void transportTerminated(Attributes transportAttrs) {
+ }
+}
diff --git a/core/src/main/java/io/grpc/ForwardingChannelBuilder.java b/core/src/main/java/io/grpc/ForwardingChannelBuilder.java
index da9d8a3b2cf..bd172e0b605 100644
--- a/core/src/main/java/io/grpc/ForwardingChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ForwardingChannelBuilder.java
@@ -79,6 +79,12 @@ public T intercept(ClientInterceptor... interceptors) {
return thisT();
}
+ @Override
+ public T addTransportFilter(ClientTransportFilter filter) {
+ delegate().addTransportFilter(filter);
+ return thisT();
+ }
+
@Override
public T userAgent(String userAgent) {
delegate().userAgent(userAgent);
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 53dc49a6ab2..672fa3871ed 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -120,6 +120,15 @@ public static ManagedChannelBuilder> forTarget(String target) {
*/
public abstract T intercept(ClientInterceptor... interceptors);
+ /**
+ * Adds a {@link ClientTransportFilter}. The order of filters being added is the order they will
+ * be executed.
+ *
+ * @return this
+ * @since 1.10.0
+ */
+ public abstract T addTransportFilter(ClientTransportFilter filter);
+
/**
* Provides a custom {@code User-Agent} for the application.
*
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 765170b8e45..67006b39a81 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -23,6 +23,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
+import io.grpc.ClientTransportFilter;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
@@ -98,6 +99,9 @@ public static ManagedChannelBuilder> forTarget(String target) {
private final List interceptors = new ArrayList();
+ protected final List transportFilters =
+ new ArrayList();
+
// Access via getter, which may perform authority override as needed
private NameResolver.Factory nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY;
@@ -205,6 +209,12 @@ public final T intercept(ClientInterceptor... interceptors) {
return intercept(Arrays.asList(interceptors));
}
+ @Override
+ public final T addTransportFilter(ClientTransportFilter filter) {
+ this.transportFilters.add(filter);
+ return thisT();
+ }
+
@Override
public final T nameResolverFactory(NameResolver.Factory resolverFactory) {
Preconditions.checkState(directServerAddress == null,
@@ -345,6 +355,7 @@ public ManagedChannel build() {
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors(),
+ this.transportFilters,
GrpcUtil.getProxyDetector(),
ChannelTracer.getDefaultFactory());
}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 215d1a6c3ab..f49e23ac24d 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -34,6 +34,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
+import io.grpc.ClientTransportFilter;
import io.grpc.CompressorRegistry;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
@@ -136,6 +137,7 @@ public final class ManagedChannelImpl extends ManagedChannel {
* any interceptors this will just be {@link RealChannel}.
*/
private final Channel interceptorChannel;
+ private final List transportFilters;
@Nullable private final String userAgent;
// Only null after channel is terminated. Must be assigned from the channelExecutor.
@@ -446,6 +448,7 @@ ClientStream newStream() {
ObjectPool extends Executor> oobExecutorPool,
Supplier stopwatchSupplier,
List interceptors,
+ List transportFilters,
ProxyDetector proxyDetector,
ChannelTracer.Factory channelTracerFactory) {
this.target = checkNotNull(builder.target, "target");
@@ -463,6 +466,7 @@ ClientStream newStream() {
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
+ this.transportFilters = checkNotNull(transportFilters, "transportFilters");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
index b58f205fdda..f6025c6a933 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
@@ -36,6 +36,7 @@
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
+import io.grpc.ClientTransportFilter;
import io.grpc.EquivalentAddressGroup;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
@@ -142,7 +143,7 @@ class Builder extends AbstractManagedChannelImplBuilder {
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(),
- Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList(),
GrpcUtil.NOOP_PROXY_DETECTOR, ChannelTracer.getDefaultFactory());
newTransports = TestUtils.captureTransports(mockTransportFactory);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index a1efa7b271d..10fe0259658 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -56,6 +56,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
+import io.grpc.ClientTransportFilter;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
@@ -229,7 +230,8 @@ class Builder extends AbstractManagedChannelImplBuilder {
checkState(channel == null);
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
- oobExecutorPool, timer.getStopwatchSupplier(), interceptors, GrpcUtil.NOOP_PROXY_DETECTOR,
+ oobExecutorPool, timer.getStopwatchSupplier(), interceptors,
+ Collections.emptyList(), GrpcUtil.NOOP_PROXY_DETECTOR,
channelStatsFactory);
if (requestConnection) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 92ef0ca728d..2416fd03659 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -27,6 +27,7 @@
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.Attributes;
+import io.grpc.ClientTransportFilter;
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
import io.grpc.NameResolver;
@@ -47,6 +48,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -322,7 +324,7 @@ protected ClientTransportFactory buildTransportFactory() {
return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
- transportTracerFactory.create());
+ transportFilters, transportTracerFactory.create());
}
@Override
@@ -458,6 +460,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto
private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
+ private final List transportFilters;
private final TransportTracer transportTracer;
private boolean closed;
@@ -467,10 +470,11 @@ private static final class NettyTransportFactory implements ClientTransportFacto
NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
- TransportTracer transportTracer) {
+ List transportFilters, TransportTracer transportTracer) {
this.channelType = channelType;
this.negotiationType = negotiationType;
this.channelOptions = new HashMap, Object>(channelOptions);
+ this.transportFilters = transportFilters;
this.transportTracer = transportTracer;
if (transportCreationParamsFilterFactory == null) {
@@ -515,7 +519,7 @@ public void run() {
dparams.getProtocolNegotiator(), flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
- tooManyPingsRunnable, transportTracer);
+ tooManyPingsRunnable, transportFilters, transportTracer);
return transport;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index f680cd8e9c4..0cae711b18a 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -24,6 +24,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.Attributes;
+import io.grpc.ClientTransportFilter;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
@@ -69,6 +70,7 @@
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.handler.logging.LogLevel;
import java.nio.channels.ClosedChannelException;
+import java.util.List;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -99,6 +101,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private final KeepAliveManager keepAliveManager;
// Returns new unstarted stopwatches
private final Supplier stopwatchFactory;
+ private final List transportFilters;
private final TransportTracer transportTracer;
private WriteQueue clientWriteQueue;
private Http2Ping ping;
@@ -111,6 +114,7 @@ static NettyClientHandler newHandler(
int maxHeaderListSize,
Supplier stopwatchFactory,
Runnable tooManyPingsRunnable,
+ List transportFilters,
TransportTracer transportTracer) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
@@ -133,6 +137,7 @@ static NettyClientHandler newHandler(
maxHeaderListSize,
stopwatchFactory,
tooManyPingsRunnable,
+ transportFilters,
transportTracer);
}
@@ -147,6 +152,7 @@ static NettyClientHandler newHandler(
int maxHeaderListSize,
Supplier stopwatchFactory,
Runnable tooManyPingsRunnable,
+ List transportFilters,
TransportTracer transportTracer) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
@@ -196,6 +202,7 @@ public TransportTracer.FlowControlWindows read() {
keepAliveManager,
stopwatchFactory,
tooManyPingsRunnable,
+ transportFilters,
transportTracer);
}
@@ -207,11 +214,13 @@ private NettyClientHandler(
KeepAliveManager keepAliveManager,
Supplier stopwatchFactory,
final Runnable tooManyPingsRunnable,
+ List transportFilters,
TransportTracer transportTracer) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
+ this.transportFilters = Preconditions.checkNotNull(transportFilters);
this.transportTracer = Preconditions.checkNotNull(transportTracer);
// Set the frame listener on the decoder.
@@ -370,6 +379,10 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ for (ClientTransportFilter filter : transportFilters) {
+ filter.transportTerminated(attributes);
+ }
+
try {
logger.fine("Network channel is closed");
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
@@ -696,6 +709,13 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
if (firstSettings) {
firstSettings = false;
lifecycleManager.notifyReady();
+
+ Attributes temporary = attributes;
+ for (ClientTransportFilter filter : transportFilters) {
+ temporary = Preconditions.checkNotNull(filter.transportReady(temporary),
+ "Filter %s returned null", filter);
+ }
+ attributes = temporary;
}
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index d595de69724..8471324f53a 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -25,6 +25,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
+import io.grpc.ClientTransportFilter;
import io.grpc.InternalLogId;
import io.grpc.InternalTransportStats;
import io.grpc.Metadata;
@@ -51,6 +52,7 @@
import io.netty.util.AsciiString;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -87,6 +89,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private Status statusExplainingWhyTheChannelIsNull;
/** Since not thread-safe, may only be used from event loop. */
private ClientTransportLifecycleManager lifecycleManager;
+ private List transportFilters;
/** Since not thread-safe, may only be used from event loop. */
private final TransportTracer transportTracer;
@@ -96,7 +99,8 @@ class NettyClientTransport implements ConnectionClientTransport {
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
- Runnable tooManyPingsRunnable, TransportTracer transportTracer) {
+ Runnable tooManyPingsRunnable, List transportFilters,
+ TransportTracer transportTracer) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
@@ -112,6 +116,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
+ this.transportFilters = Preconditions.checkNotNull(transportFilters, "transportFilters");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
}
@@ -192,6 +197,7 @@ public Runnable start(Listener transportListener) {
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
tooManyPingsRunnable,
+ transportFilters,
transportTracer);
NettyHandlerSettings.setAutoWindow(handler);
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index a3097ad402d..738a288001d 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -48,6 +48,7 @@
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import io.grpc.ClientTransportFilter;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
@@ -78,6 +79,7 @@
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -682,6 +684,7 @@ public Stopwatch get() {
maxHeaderListSize,
stopwatchSupplier,
tooManyPingsRunnable,
+ new ArrayList(),
transportTracer);
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index dc4c44e7989..381cf8eb447 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -38,6 +38,7 @@
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
+import io.grpc.ClientTransportFilter;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.Metadata;
@@ -80,6 +81,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -171,7 +173,7 @@ public void setSoLingerChannelOption() throws IOException {
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
- tooManyPingsRunnable, new TransportTracer());
+ tooManyPingsRunnable, new ArrayList(), new TransportTracer());
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
@@ -375,7 +377,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception {
address, CantConstructChannel.class, new HashMap, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
- null, tooManyPingsRunnable, new TransportTracer());
+ null, tooManyPingsRunnable, new ArrayList(), new TransportTracer());
transports.add(transport);
// Should not throw
@@ -513,6 +515,43 @@ public void keepAliveDisabled() throws Exception {
assertNull(transport.keepAliveManager());
}
+ @Test
+ public void transportFilter() throws Exception {
+ startServer();
+
+ final CountDownLatch countDownLatch = new CountDownLatch(2);
+
+ ClientTransportFilter transportFilter = new ClientTransportFilter() {
+ @Override
+ public Attributes transportReady(Attributes transportAttrs) {
+ countDownLatch.countDown();
+ return transportAttrs;
+ }
+
+ @Override
+ public void transportTerminated(Attributes transportAttrs) {
+ countDownLatch.countDown();
+ }
+ };
+
+ List transportFilters = new ArrayList();
+ transportFilters.add(transportFilter);
+
+ NettyClientTransport transport = new NettyClientTransport(
+ address, NioSocketChannel.class, new HashMap, Object>(), group,
+ newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
+ GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
+ KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
+ null, tooManyPingsRunnable, transportFilters, new TransportTracer());
+
+ callMeMaybe(transport.start(clientTransportListener));
+ Rpc rpc = new Rpc(transport).halfClose();
+ rpc.waitForResponse();
+
+ transport.shutdown(Status.UNAVAILABLE);
+ countDownLatch.await();
+ }
+
private Throwable getRootCause(Throwable t) {
if (t.getCause() == null) {
return t;
@@ -544,7 +583,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
- new TransportTracer());
+ new ArrayList(), new TransportTracer());
transports.add(transport);
return transport;
}