From af4a81470e1d20bcdd0b4f0aa41a532f448cd16f Mon Sep 17 00:00:00 2001 From: Xiaoshuang LU Date: Fri, 22 Dec 2017 17:47:49 +0800 Subject: [PATCH] introduce addTransportFilter to NettyChannel --- .../java/io/grpc/ClientTransportFilter.java | 58 +++++++++++++++++++ .../io/grpc/ForwardingChannelBuilder.java | 6 ++ .../java/io/grpc/ManagedChannelBuilder.java | 9 +++ .../AbstractManagedChannelImplBuilder.java | 11 ++++ .../io/grpc/internal/ManagedChannelImpl.java | 4 ++ .../ManagedChannelImplIdlenessTest.java | 3 +- .../grpc/internal/ManagedChannelImplTest.java | 4 +- .../io/grpc/netty/NettyChannelBuilder.java | 10 +++- .../io/grpc/netty/NettyClientHandler.java | 20 +++++++ .../io/grpc/netty/NettyClientTransport.java | 8 ++- .../io/grpc/netty/NettyClientHandlerTest.java | 3 + .../grpc/netty/NettyClientTransportTest.java | 45 +++++++++++++- 12 files changed, 172 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/io/grpc/ClientTransportFilter.java diff --git a/core/src/main/java/io/grpc/ClientTransportFilter.java b/core/src/main/java/io/grpc/ClientTransportFilter.java new file mode 100644 index 00000000000..98ce983490d --- /dev/null +++ b/core/src/main/java/io/grpc/ClientTransportFilter.java @@ -0,0 +1,58 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * Listens on client transport life-cycle events, with the capability to read and/or change + * transport attributes. Attributes returned by this filter will be merged into {@link + * ClientCall#getAttributes}. + * + *

Multiple filters maybe registered to a channel, in which case the output of a filter is the + * input of the next filter. For example, what returned by {@link #transportReady} of a filter is + * passed to the same method of the next filter, and the last filter's return value is the effective + * transport attributes. + * + *

{@link Grpc} defines commonly used attributes. + * + * @since 1.10.0 + */ +public abstract class ClientTransportFilter { + /** + * Called when a transport is ready to process streams. All necessary handshakes, e.g., TLS + * handshake, are done at this point. + * + *

Note the implementation should always inherit the passed-in attributes using {@code + * Attributes.newBuilder(transportAttrs)}, instead of creating one from scratch. + * + * @param transportAttrs current transport attributes + * + * @return new transport attributes. Default implementation returns the passed-in attributes + * intact. + */ + public Attributes transportReady(Attributes transportAttrs) { + return transportAttrs; + } + + /** + * Called when a transport is terminated. Default implementation is no-op. + * + * @param transportAttrs the effective transport attributes, which is what returned by {@link + * #transportReady} of the last executed filter. + */ + public void transportTerminated(Attributes transportAttrs) { + } +} diff --git a/core/src/main/java/io/grpc/ForwardingChannelBuilder.java b/core/src/main/java/io/grpc/ForwardingChannelBuilder.java index da9d8a3b2cf..bd172e0b605 100644 --- a/core/src/main/java/io/grpc/ForwardingChannelBuilder.java +++ b/core/src/main/java/io/grpc/ForwardingChannelBuilder.java @@ -79,6 +79,12 @@ public T intercept(ClientInterceptor... interceptors) { return thisT(); } + @Override + public T addTransportFilter(ClientTransportFilter filter) { + delegate().addTransportFilter(filter); + return thisT(); + } + @Override public T userAgent(String userAgent) { delegate().userAgent(userAgent); diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java index 53dc49a6ab2..672fa3871ed 100644 --- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -120,6 +120,15 @@ public static ManagedChannelBuilder forTarget(String target) { */ public abstract T intercept(ClientInterceptor... interceptors); + /** + * Adds a {@link ClientTransportFilter}. The order of filters being added is the order they will + * be executed. + * + * @return this + * @since 1.10.0 + */ + public abstract T addTransportFilter(ClientTransportFilter filter); + /** * Provides a custom {@code User-Agent} for the application. * diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 765170b8e45..67006b39a81 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Attributes; import io.grpc.ClientInterceptor; +import io.grpc.ClientTransportFilter; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.EquivalentAddressGroup; @@ -98,6 +99,9 @@ public static ManagedChannelBuilder forTarget(String target) { private final List interceptors = new ArrayList(); + protected final List transportFilters = + new ArrayList(); + // Access via getter, which may perform authority override as needed private NameResolver.Factory nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY; @@ -205,6 +209,12 @@ public final T intercept(ClientInterceptor... interceptors) { return intercept(Arrays.asList(interceptors)); } + @Override + public final T addTransportFilter(ClientTransportFilter filter) { + this.transportFilters.add(filter); + return thisT(); + } + @Override public final T nameResolverFactory(NameResolver.Factory resolverFactory) { Preconditions.checkState(directServerAddress == null, @@ -345,6 +355,7 @@ public ManagedChannel build() { SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR), GrpcUtil.STOPWATCH_SUPPLIER, getEffectiveInterceptors(), + this.transportFilters, GrpcUtil.getProxyDetector(), ChannelTracer.getDefaultFactory()); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 215d1a6c3ab..f49e23ac24d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -34,6 +34,7 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; +import io.grpc.ClientTransportFilter; import io.grpc.CompressorRegistry; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; @@ -136,6 +137,7 @@ public final class ManagedChannelImpl extends ManagedChannel { * any interceptors this will just be {@link RealChannel}. */ private final Channel interceptorChannel; + private final List transportFilters; @Nullable private final String userAgent; // Only null after channel is terminated. Must be assigned from the channelExecutor. @@ -446,6 +448,7 @@ ClientStream newStream() { ObjectPool oobExecutorPool, Supplier stopwatchSupplier, List interceptors, + List transportFilters, ProxyDetector proxyDetector, ChannelTracer.Factory channelTracerFactory) { this.target = checkNotNull(builder.target, "target"); @@ -463,6 +466,7 @@ ClientStream newStream() { this.transportFactory = new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); + this.transportFilters = checkNotNull(transportFilters, "transportFilters"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { this.idleTimeoutMillis = builder.idleTimeoutMillis; diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index b58f205fdda..f6025c6a933 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -36,6 +36,7 @@ import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ClientTransportFilter; import io.grpc.EquivalentAddressGroup; import io.grpc.IntegerMarshaller; import io.grpc.LoadBalancer; @@ -142,7 +143,7 @@ class Builder extends AbstractManagedChannelImplBuilder { channel = new ManagedChannelImpl( builder, mockTransportFactory, new FakeBackoffPolicyProvider(), oobExecutorPool, timer.getStopwatchSupplier(), - Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), GrpcUtil.NOOP_PROXY_DETECTOR, ChannelTracer.getDefaultFactory()); newTransports = TestUtils.captureTransports(mockTransportFactory); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index a1efa7b271d..10fe0259658 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -56,6 +56,7 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientStreamTracer; +import io.grpc.ClientTransportFilter; import io.grpc.ConnectivityStateInfo; import io.grpc.Context; import io.grpc.EquivalentAddressGroup; @@ -229,7 +230,8 @@ class Builder extends AbstractManagedChannelImplBuilder { checkState(channel == null); channel = new ManagedChannelImpl( builder, mockTransportFactory, new FakeBackoffPolicyProvider(), - oobExecutorPool, timer.getStopwatchSupplier(), interceptors, GrpcUtil.NOOP_PROXY_DETECTOR, + oobExecutorPool, timer.getStopwatchSupplier(), interceptors, + Collections.emptyList(), GrpcUtil.NOOP_PROXY_DETECTOR, channelStatsFactory); if (requestConnection) { diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 92ef0ca728d..2416fd03659 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Attributes; +import io.grpc.ClientTransportFilter; import io.grpc.ExperimentalApi; import io.grpc.Internal; import io.grpc.NameResolver; @@ -47,6 +48,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -322,7 +324,7 @@ protected ClientTransportFactory buildTransportFactory() { return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions, negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, - transportTracerFactory.create()); + transportFilters, transportTracerFactory.create()); } @Override @@ -458,6 +460,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto private final AtomicBackoff keepAliveTimeNanos; private final long keepAliveTimeoutNanos; private final boolean keepAliveWithoutCalls; + private final List transportFilters; private final TransportTracer transportTracer; private boolean closed; @@ -467,10 +470,11 @@ private static final class NettyTransportFactory implements ClientTransportFacto NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, - TransportTracer transportTracer) { + List transportFilters, TransportTracer transportTracer) { this.channelType = channelType; this.negotiationType = negotiationType; this.channelOptions = new HashMap, Object>(channelOptions); + this.transportFilters = transportFilters; this.transportTracer = transportTracer; if (transportCreationParamsFilterFactory == null) { @@ -515,7 +519,7 @@ public void run() { dparams.getProtocolNegotiator(), flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(), - tooManyPingsRunnable, transportTracer); + tooManyPingsRunnable, transportFilters, transportTracer); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index f680cd8e9c4..0cae711b18a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -24,6 +24,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import io.grpc.Attributes; +import io.grpc.ClientTransportFilter; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; @@ -69,6 +70,7 @@ import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; import java.nio.channels.ClosedChannelException; +import java.util.List; import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; @@ -99,6 +101,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final KeepAliveManager keepAliveManager; // Returns new unstarted stopwatches private final Supplier stopwatchFactory; + private final List transportFilters; private final TransportTracer transportTracer; private WriteQueue clientWriteQueue; private Http2Ping ping; @@ -111,6 +114,7 @@ static NettyClientHandler newHandler( int maxHeaderListSize, Supplier stopwatchFactory, Runnable tooManyPingsRunnable, + List transportFilters, TransportTracer transportTracer) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); @@ -133,6 +137,7 @@ static NettyClientHandler newHandler( maxHeaderListSize, stopwatchFactory, tooManyPingsRunnable, + transportFilters, transportTracer); } @@ -147,6 +152,7 @@ static NettyClientHandler newHandler( int maxHeaderListSize, Supplier stopwatchFactory, Runnable tooManyPingsRunnable, + List transportFilters, TransportTracer transportTracer) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); @@ -196,6 +202,7 @@ public TransportTracer.FlowControlWindows read() { keepAliveManager, stopwatchFactory, tooManyPingsRunnable, + transportFilters, transportTracer); } @@ -207,11 +214,13 @@ private NettyClientHandler( KeepAliveManager keepAliveManager, Supplier stopwatchFactory, final Runnable tooManyPingsRunnable, + List transportFilters, TransportTracer transportTracer) { super(decoder, encoder, settings); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; + this.transportFilters = Preconditions.checkNotNull(transportFilters); this.transportTracer = Preconditions.checkNotNull(transportTracer); // Set the frame listener on the decoder. @@ -370,6 +379,10 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + for (ClientTransportFilter filter : transportFilters) { + filter.transportTerminated(attributes); + } + try { logger.fine("Network channel is closed"); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); @@ -696,6 +709,13 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { if (firstSettings) { firstSettings = false; lifecycleManager.notifyReady(); + + Attributes temporary = attributes; + for (ClientTransportFilter filter : transportFilters) { + temporary = Preconditions.checkNotNull(filter.transportReady(temporary), + "Filter %s returned null", filter); + } + attributes = temporary; } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index d595de69724..8471324f53a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.ClientTransportFilter; import io.grpc.InternalLogId; import io.grpc.InternalTransportStats; import io.grpc.Metadata; @@ -51,6 +52,7 @@ import io.netty.util.AsciiString; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.logging.Logger; @@ -87,6 +89,7 @@ class NettyClientTransport implements ConnectionClientTransport { private Status statusExplainingWhyTheChannelIsNull; /** Since not thread-safe, may only be used from event loop. */ private ClientTransportLifecycleManager lifecycleManager; + private List transportFilters; /** Since not thread-safe, may only be used from event loop. */ private final TransportTracer transportTracer; @@ -96,7 +99,8 @@ class NettyClientTransport implements ConnectionClientTransport { ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, - Runnable tooManyPingsRunnable, TransportTracer transportTracer) { + Runnable tooManyPingsRunnable, List transportFilters, + TransportTracer transportTracer) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.address = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); @@ -112,6 +116,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent)); this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); + this.transportFilters = Preconditions.checkNotNull(transportFilters, "transportFilters"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); } @@ -192,6 +197,7 @@ public Runnable start(Listener transportListener) { maxHeaderListSize, GrpcUtil.STOPWATCH_SUPPLIER, tooManyPingsRunnable, + transportFilters, transportTracer); NettyHandlerSettings.setAutoWindow(handler); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index a3097ad402d..738a288001d 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -48,6 +48,7 @@ import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.grpc.ClientTransportFilter; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; @@ -78,6 +79,7 @@ import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; import java.io.InputStream; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -682,6 +684,7 @@ public Stopwatch get() { maxHeaderListSize, stopwatchSupplier, tooManyPingsRunnable, + new ArrayList(), transportTracer); } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index dc4c44e7989..381cf8eb447 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -38,6 +38,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.ClientTransportFilter; import io.grpc.Context; import io.grpc.Grpc; import io.grpc.Metadata; @@ -80,6 +81,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -171,7 +173,7 @@ public void setSoLingerChannelOption() throws IOException { address, NioSocketChannel.class, channelOptions, group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, - tooManyPingsRunnable, new TransportTracer()); + tooManyPingsRunnable, new ArrayList(), new TransportTracer()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -375,7 +377,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { address, CantConstructChannel.class, new HashMap, Object>(), group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, - null, tooManyPingsRunnable, new TransportTracer()); + null, tooManyPingsRunnable, new ArrayList(), new TransportTracer()); transports.add(transport); // Should not throw @@ -513,6 +515,43 @@ public void keepAliveDisabled() throws Exception { assertNull(transport.keepAliveManager()); } + @Test + public void transportFilter() throws Exception { + startServer(); + + final CountDownLatch countDownLatch = new CountDownLatch(2); + + ClientTransportFilter transportFilter = new ClientTransportFilter() { + @Override + public Attributes transportReady(Attributes transportAttrs) { + countDownLatch.countDown(); + return transportAttrs; + } + + @Override + public void transportTerminated(Attributes transportAttrs) { + countDownLatch.countDown(); + } + }; + + List transportFilters = new ArrayList(); + transportFilters.add(transportFilter); + + NettyClientTransport transport = new NettyClientTransport( + address, NioSocketChannel.class, new HashMap, Object>(), group, + newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, + null, tooManyPingsRunnable, transportFilters, new TransportTracer()); + + callMeMaybe(transport.start(clientTransportListener)); + Rpc rpc = new Rpc(transport).halfClose(); + rpc.waitForResponse(); + + transport.shutdown(Status.UNAVAILABLE); + countDownLatch.await(); + } + private Throwable getRootCause(Throwable t) { if (t.getCause() == null) { return t; @@ -544,7 +583,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, - new TransportTracer()); + new ArrayList(), new TransportTracer()); transports.add(transport); return transport; }