diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 00000000..1f309f94 --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,12 @@ +# Controls the behavior of the codecov.io integration. +# See https://github.com/codecov/support/wiki/Codecov-Yaml for details on the options. +coverage: + status: + project: + default: + enabled: yes + target: 85% + patch: + default: + enabled: yes + target: 90% diff --git a/riposte-core/src/main/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClient.java b/riposte-core/src/main/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClient.java index 57764b6b..e1758cfa 100644 --- a/riposte-core/src/main/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClient.java +++ b/riposte-core/src/main/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClient.java @@ -42,6 +42,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.channel.CombinedChannelDuplexHandler; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -138,12 +139,23 @@ public static class StreamingChannel { protected final Channel channel; protected final ChannelPool pool; protected final ObjectHolder callActiveHolder; + protected final ObjectHolder downstreamLastChunkSentHolder; + protected final Deque distributedTracingSpanStack; + protected final Map distributedTracingMdcInfo; protected boolean channelClosedDueToUnrecoverableError = false; - StreamingChannel(Channel channel, ChannelPool pool, ObjectHolder callActiveHolder) { + StreamingChannel(Channel channel, + ChannelPool pool, + ObjectHolder callActiveHolder, + ObjectHolder downstreamLastChunkSentHolder, + Deque distributedTracingSpanStack, + Map distributedTracingMdcInfo) { this.channel = channel; this.pool = pool; this.callActiveHolder = callActiveHolder; + this.downstreamLastChunkSentHolder = downstreamLastChunkSentHolder; + this.distributedTracingSpanStack = distributedTracingSpanStack; + this.distributedTracingMdcInfo = distributedTracingMdcInfo; } /** @@ -158,28 +170,170 @@ public static class StreamingChannel { * @return The {@link ChannelFuture} that will tell you if the write succeeded. */ public ChannelFuture streamChunk(HttpContent chunkToWrite) { - if (channelClosedDueToUnrecoverableError) { - chunkToWrite.release(); - return channel.newFailedFuture(new RuntimeException( - "Unable to stream chunks downstream - " - + "the channel was closed previously due to an unrecoverable error" - )); + try { + ChannelPromise result = channel.newPromise(); + + channel.eventLoop().execute( + () -> doStreamChunk(chunkToWrite).addListener(future -> { + if (future.isCancelled()) { + result.cancel(true); + } + else if (future.isSuccess()) { + result.setSuccess(); + } + else if (future.cause() != null) { + result.setFailure(future.cause()); + } + else { + runnableWithTracingAndMdc( + () -> logger.error( + "Found a future with no result. This should not be possible. Failing the future. " + + "future_done={}, future_success={}, future_cancelled={}, future_failure_cause={}", + future.isDone(), future.isSuccess(), future.isCancelled(), future.cause() + ), + distributedTracingSpanStack, distributedTracingMdcInfo + ).run(); + result.setFailure( + new RuntimeException("Received ChannelFuture that was in an impossible state") + ); + } + }) + ); + + return result; + } + catch(Throwable t) { + String errorMsg = + "StreamingChannel.streamChunk() threw an exception. This should not be possible and indicates " + + "something went wrong with a Netty write() and flush(). If you see Netty memory leak warnings " + + "then this could be why. Please report this along with the stack trace to " + + "https://github.com/Nike-Inc/riposte/issues/new"; + + Exception exceptionToPass = new RuntimeException(errorMsg, t); + + logger.error(errorMsg, exceptionToPass); + return channel.newFailedFuture(exceptionToPass); } + } + + protected ChannelFuture doStreamChunk(HttpContent chunkToWrite) { + // We are in the channel's event loop. Do some final checks to make sure it's still ok to write and flush + // the message, then do it (or handle the special cases appropriately). + try { + if (downstreamLastChunkSentHolder.heldObject + && (chunkToWrite instanceof LastHttpContent) + && chunkToWrite.content().readableBytes() == 0 + ) { + // A LastHttpContent has already been written downstream. This is valid/legal when the downstream call + // has a content-length header, and the downstream pipeline encoder realizes there's no more + // content to write and generates a synthetic empty LastHttpContent rather than waiting for this + // one to arrive. Therefore there's nothing to do but release the content chunk and return an + // already-successfully-completed future. + if (logger.isDebugEnabled()) { + runnableWithTracingAndMdc( + () -> logger.warn("Ignoring zero-length LastHttpContent from upstream, because a " + + "LastHttpContent has already been sent downstream."), + distributedTracingSpanStack, distributedTracingMdcInfo + ).run(); + } + chunkToWrite.release(); + return channel.newSucceededFuture(); + } + + if (!callActiveHolder.heldObject) { + chunkToWrite.release(); + return channel.newFailedFuture( + new RuntimeException("Unable to stream chunk - downstream call is no longer active.") + ); + } + + if (channelClosedDueToUnrecoverableError) { + chunkToWrite.release(); + return channel.newFailedFuture( + new RuntimeException("Unable to stream chunks downstream - the channel was closed previously " + + "due to an unrecoverable error") + ); + } + + return channel.writeAndFlush(chunkToWrite); + } + catch(Throwable t) { + String errorMsg = + "StreamingChannel.doStreamChunk() threw an exception. This should not be possible and indicates " + + "something went wrong with a Netty write() and flush(). If you see Netty memory leak warnings " + + "then this could be why. Please report this along with the stack trace to " + + "https://github.com/Nike-Inc/riposte/issues/new"; + + Exception exceptionToPass = new RuntimeException(errorMsg, t); + + logger.error(errorMsg, exceptionToPass); + return channel.newFailedFuture(exceptionToPass); + } + } + + private static final Logger logger = LoggerFactory.getLogger(StreamingChannel.class); - return channel.writeAndFlush(chunkToWrite); + public Channel getChannel() { + return channel; } - public void closeChannelDueToUnrecoverableError() { - channelClosedDueToUnrecoverableError = true; + public void closeChannelDueToUnrecoverableError(Throwable cause) { + try { + // Ignore subsequent calls to this method, and only try to do something if the call is still active. + // If the call is *not* active, then everything has already been cleaned up and we shouldn't + // do anything because the channel might have already been handed out for a different call. + if (!channelClosedDueToUnrecoverableError && callActiveHolder.heldObject) { + // Schedule the close on the channel's event loop. + channel.eventLoop().execute(() -> doCloseChannelDueToUnrecoverableError(cause)); + } + + if (logger.isDebugEnabled()) { + runnableWithTracingAndMdc( + () -> logger.debug( + "Ignoring call to StreamingChannel.closeChannelDueToUnrecoverableError() because it " + + "has already been called, or the call is no longer active. " + + "previously_called={}, call_is_active={}", + channelClosedDueToUnrecoverableError, callActiveHolder.heldObject + ), + distributedTracingSpanStack, distributedTracingMdcInfo + ).run(); + } + } + finally { + channelClosedDueToUnrecoverableError = true; + } + } + + protected void doCloseChannelDueToUnrecoverableError(Throwable cause) { + // We should now be in the channel's event loop. Do a final check to make sure the call is still active + // since it could have closed while we were waiting to run on the event loop. + if (callActiveHolder.heldObject) { + runnableWithTracingAndMdc( + () -> logger.error("Closing StreamingChannel due to unrecoverable error. " + + "channel_id={}, unrecoverable_error={}", + channel.toString(), String.valueOf(cause) + ), + distributedTracingSpanStack, distributedTracingMdcInfo + ).run(); - // Mark the channel as broken so it will be closed and removed from the pool when it is returned. - markChannelAsBroken(channel); + // Mark the channel as broken so it will be closed and removed from the pool when it is returned. + markChannelAsBroken(channel); - // Release it back to the pool if possible/necessary so the pool can do its usual cleanup. - releaseChannelBackToPoolIfCallIsActive(channel, pool, callActiveHolder); + // Release it back to the pool if possible/necessary so the pool can do its usual cleanup. + releaseChannelBackToPoolIfCallIsActive( + channel, pool, callActiveHolder, + "closing StreamingChannel due to unrecoverable error: " + String.valueOf(cause), + distributedTracingSpanStack, distributedTracingMdcInfo + ); - // No matter what the cause is we want to make sure the channel is closed. - channel.close(); + // No matter what the cause is we want to make sure the channel is closed. + channel.close(); + } + else { + logger.debug("The call deactivated before we could close the StreamingChannel. Therefore there's " + + "nothing to do for this unrecoverable error as any necessary cleanup has already been " + + "done. ignored_unrecoverable_error={}", cause.toString()); + } } } @@ -532,11 +686,13 @@ public CompletableFuture streamDownstreamCall( try { ObjectHolder callActiveHolder = new ObjectHolder<>(); callActiveHolder.heldObject = true; + ObjectHolder lastChunkSentDownstreamHolder = new ObjectHolder<>(); + lastChunkSentDownstreamHolder.heldObject = false; //noinspection ConstantConditions prepChannelForDownstreamCall( pool, ch, callback, distributedSpanStackToUse, mdcContextToUse, isSecureHttpsCall, relaxedHttpsValidation, performSubSpanAroundDownstreamCalls, downstreamCallTimeoutMillis, - callActiveHolder + callActiveHolder, lastChunkSentDownstreamHolder ); logInitialRequestChunk(initialRequestChunk, downstreamHost, downstreamPort); @@ -548,7 +704,10 @@ public CompletableFuture streamDownstreamCall( // for any further chunk streaming writeFuture.addListener(completedWriteFuture -> { if (completedWriteFuture.isSuccess()) - streamingChannel.complete(new StreamingChannel(ch, pool, callActiveHolder)); + streamingChannel.complete(new StreamingChannel( + ch, pool, callActiveHolder, lastChunkSentDownstreamHolder, + distributedSpanStackToUse, mdcContextToUse + )); else { prepChannelErrorHandler.accept( "Writing the first HttpRequest chunk to the downstream service failed.", @@ -605,7 +764,7 @@ protected void prepChannelForDownstreamCall( ChannelPool pool, Channel ch, StreamingCallback callback, Deque distributedSpanStackToUse, Map mdcContextToUse, boolean isSecureHttpsCall, boolean relaxedHttpsValidation, boolean performSubSpanAroundDownstreamCalls, long downstreamCallTimeoutMillis, - ObjectHolder callActiveHolder + ObjectHolder callActiveHolder, ObjectHolder lastChunkSentDownstreamHolder ) throws SSLException, NoSuchAlgorithmException, KeyStoreException { ChannelHandler chunkSenderHandler = new SimpleChannelInboundHandler() { @@ -615,17 +774,21 @@ protected void channelRead0(ChannelHandlerContext downstreamCallCtx, HttpObject // Only do the distributed trace and callback work if the call is active. Messages that pop up after // the call is fully processed should not trigger the behavior a second time. if (callActiveHolder.heldObject) { - if (msg instanceof LastHttpContent && performSubSpanAroundDownstreamCalls) { - // Complete the subspan. - runnableWithTracingAndMdc( - () -> { - if (distributedSpanStackToUse == null || distributedSpanStackToUse.size() < 2) - Tracer.getInstance().completeRequestSpan(); - else - Tracer.getInstance().completeSubSpan(); - }, - distributedSpanStackToUse, mdcContextToUse - ).run(); + if (msg instanceof LastHttpContent) { + lastChunkSentDownstreamHolder.heldObject = true; + + if (performSubSpanAroundDownstreamCalls) { + // Complete the subspan. + runnableWithTracingAndMdc( + () -> { + if (distributedSpanStackToUse == null || distributedSpanStackToUse.size() < 2) + Tracer.getInstance().completeRequestSpan(); + else + Tracer.getInstance().completeSubSpan(); + }, + distributedSpanStackToUse, mdcContextToUse + ).run(); + } } HttpObject msgToPass = msg; @@ -662,8 +825,10 @@ protected void channelRead0(ChannelHandlerContext downstreamCallCtx, HttpObject } } finally { - if (msg instanceof LastHttpContent) - releaseChannelBackToPoolIfCallIsActive(ch, pool, callActiveHolder); + if (msg instanceof LastHttpContent) { + releaseChannelBackToPoolIfCallIsActive(ch, pool, callActiveHolder, "last content chunk sent", + distributedSpanStackToUse, mdcContextToUse); + } } } }; @@ -719,7 +884,10 @@ protected void channelRead0(ChannelHandlerContext downstreamCallCtx, HttpObject markChannelAsBroken(ch); // Release it back to the pool if possible/necessary so the pool can do its usual cleanup. - releaseChannelBackToPoolIfCallIsActive(ch, pool, callActiveHolder); + releaseChannelBackToPoolIfCallIsActive( + ch, pool, callActiveHolder, "error received in downstream pipeline: " + cause.toString(), + distributedSpanStackToUse, mdcContextToUse + ); // No matter what the cause is we want to make sure the channel is closed. Doing this raw ch.close() // here will catch the cases where this channel does not have an active call but still needs to be @@ -739,6 +907,17 @@ public void exceptionCaught(ChannelHandlerContext downstreamCallCtx, Throwable c @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (logger.isDebugEnabled()) { + runnableWithTracingAndMdc( + () -> logger.debug( + "Downstream channel closing. call_active={}, last_chunk_sent_downstream={}, channel_id={}", + callActiveHolder.heldObject, lastChunkSentDownstreamHolder.heldObject, + ctx.channel().toString() + ), + distributedSpanStackToUse, mdcContextToUse + ).run(); + } + // We only care if the channel was closed while the call was active. if (callActiveHolder.heldObject) doErrorHandlingConsumer.accept(new DownstreamChannelClosedUnexpectedlyException(ch)); @@ -811,19 +990,25 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { HttpClientCodec currentCodec = (HttpClientCodec) p.get(HTTP_CLIENT_CODEC_HANDLER_NAME); int currentHttpClientCodecInboundState = determineHttpClientCodecInboundState(currentCodec); if (currentHttpClientCodecInboundState != 0) { - logger.warn( - "HttpClientCodec inbound state was not 0. It will be replaced with a fresh HttpClientCodec. " - + "bad_httpclientcodec_inbound_state={}", currentHttpClientCodecInboundState - ); + runnableWithTracingAndMdc( + () -> logger.warn( + "HttpClientCodec inbound state was not 0. It will be replaced with a fresh HttpClientCodec. " + + "bad_httpclientcodec_inbound_state={}", currentHttpClientCodecInboundState + ), + distributedSpanStackToUse, mdcContextToUse + ).run(); existingHttpClientCodecIsInBadState = true; } else { int currentHttpClientCodecOutboundState = determineHttpClientCodecOutboundState(currentCodec); if (currentHttpClientCodecOutboundState != 0) { - logger.warn( - "HttpClientCodec outbound state was not 0. It will be replaced with a fresh HttpClientCodec. " - + "bad_httpclientcodec_outbound_state={}", currentHttpClientCodecOutboundState - ); + runnableWithTracingAndMdc( + () -> logger.warn( + "HttpClientCodec outbound state was not 0. It will be replaced with a fresh HttpClientCodec. " + + "bad_httpclientcodec_outbound_state={}", currentHttpClientCodecOutboundState + ), + distributedSpanStackToUse, mdcContextToUse + ).run(); existingHttpClientCodecIsInBadState = true; } } @@ -916,8 +1101,9 @@ protected static void markChannelBrokenAndLogInfoIfHttpClientCodecStateIsNotZero boolean channelAlreadyBroken = channelIsMarkedAsBeingBroken(ch); logger.warn( "HttpClientCodec inbound state was not 0. The channel will be marked as broken so it won't be " - + "used. bad_httpclientcodec_inbound_state={}, channel_already_broken={}, call_context=\"{}\"", - currentHttpClientCodecInboundState, channelAlreadyBroken, callContextForLogs + + "used. bad_httpclientcodec_inbound_state={}, channel_already_broken={}, channel_id={}, " + + "call_context=\"{}\"", + currentHttpClientCodecInboundState, channelAlreadyBroken, ch.toString(), callContextForLogs ); markChannelAsBroken(ch); } @@ -927,8 +1113,9 @@ protected static void markChannelBrokenAndLogInfoIfHttpClientCodecStateIsNotZero boolean channelAlreadyBroken = channelIsMarkedAsBeingBroken(ch); logger.warn( "HttpClientCodec outbound state was not 0. The channel will be marked as broken so it won't be " - + "used. bad_httpclientcodec_outbound_state={}, channel_already_broken={}, call_context=\"{}\"", - currentHttpClientCodecOutboundState, channelAlreadyBroken, callContextForLogs + + "used. bad_httpclientcodec_outbound_state={}, channel_already_broken={}, channel_id={}, " + + "call_context=\"{}\"", + currentHttpClientCodecOutboundState, channelAlreadyBroken, ch.toString(), callContextForLogs ); markChannelAsBroken(ch); } @@ -945,8 +1132,21 @@ protected void addOrReplacePipelineHandler(ChannelHandler handler, String handle } protected static void releaseChannelBackToPoolIfCallIsActive(Channel ch, ChannelPool pool, - ObjectHolder callActiveHolder) { + ObjectHolder callActiveHolder, + String contextReason, + Deque distributedTracingStack, + Map distributedTracingMdcInfo) { if (callActiveHolder.heldObject) { + if (logger.isDebugEnabled()) { + runnableWithTracingAndMdc( + () -> logger.debug( + "Marking call as inactive and releasing channel back to pool. " + + "channel_release_reason=\"{}\"", contextReason + ), + distributedTracingStack, distributedTracingMdcInfo + ).run(); + } + callActiveHolder.heldObject = false; pool.release(ch); } diff --git a/riposte-core/src/main/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandler.java b/riposte-core/src/main/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandler.java index 66f1c5da..26902f8d 100644 --- a/riposte-core/src/main/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandler.java +++ b/riposte-core/src/main/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandler.java @@ -198,6 +198,13 @@ protected void finalizeChannelPipeline(ChannelHandlerContext ctx, Object msg, Ht // If we're in an error case (cause != null) and the response sending has started but not completed, then this // request is broken. We can't do anything except kill the channel. if ((cause != null) && state.isResponseSendingStarted() && !state.isResponseSendingLastChunkSent()) { + runnableWithTracingAndMdc( + () -> logger.error( + "Received an error in ChannelPipelineFinalizerHandler after response sending was started, but " + + "before it finished. Closing the channel. unexpected_error={}", cause.toString() + ), + ctx + ).run(); ctx.channel().close(); } } @@ -221,8 +228,25 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx) ProxyRouterProcessingState proxyRouterState = ChannelAttributes.getProxyRouterProcessingStateForChannel(ctx).get(); - RequestInfo requestInfo = (httpState == null) ? null : httpState.getRequestInfo(); - ResponseInfo responseInfo = (httpState == null) ? null : httpState.getResponseInfo(); + if (httpState == null) { + if (proxyRouterState == null) { + logger.debug("This channel closed before it processed any requests. Nothing to cleanup. " + + "current_span={}", Tracer.getInstance().getCurrentSpan()); + } + else { + // This should never happen, but if it does we'll try to release what we can and then return. + logger.error("Found a channel where HttpProcessingState was null, but ProxyRouterProcessingState " + + "was not null. This should not be possible! " + + "current_span={}", Tracer.getInstance().getCurrentSpan()); + releaseProxyRouterStateResources(proxyRouterState); + } + + // With httpState null, there's nothing left for us to do. + return PipelineContinuationBehavior.CONTINUE; + } + + RequestInfo requestInfo = httpState.getRequestInfo(); + ResponseInfo responseInfo = httpState.getResponseInfo(); if (logger.isDebugEnabled()) { runnableWithTracingAndMdc( @@ -238,7 +262,7 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx) // debug investigations can find out what happened. // TODO: Is there a way we can handle access logging and/or metrics here, but only if it wasn't done elsewhere? @SuppressWarnings("SimplifiableConditionalExpression") - boolean tracingAlreadyCompleted = (httpState == null) ? true : httpState.isTraceCompletedOrScheduled(); + boolean tracingAlreadyCompleted = httpState.isTraceCompletedOrScheduled(); boolean responseNotFullySent = responseInfo == null || !responseInfo.isResponseSendingLastChunkSent(); if (responseNotFullySent || !tracingAlreadyCompleted) { runnableWithTracingAndMdc( @@ -248,7 +272,8 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx) "The caller's channel was closed before a response could be sent. This means that " + "an access log probably does not exist for this request. Distributed tracing " + "will be completed now if it wasn't already done. Any dangling resources will be " - + "released." + + "released. response_info_is_null={}", + (responseInfo == null) ); } @@ -268,16 +293,7 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx) if (requestInfo != null) requestInfo.releaseAllResources(); - // Tell the ProxyRouterProcessingState that the stream failed and trigger its chunk streaming error handling - // with an artificial exception. If the call had already succeeded previously then this will do - // nothing, but if it hasn't already succeeded then it's not going to (since the connection is closing) - // and doing this will cause any resources its holding onto to be released. - if (proxyRouterState != null) { - proxyRouterState.setStreamingFailed(); - proxyRouterState.triggerStreamingChannelErrorForChunks( - new RuntimeException("Server worker channel closed") - ); - } + releaseProxyRouterStateResources(proxyRouterState); } catch(Throwable t) { runnableWithTracingAndMdc( @@ -290,4 +306,21 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx) return PipelineContinuationBehavior.CONTINUE; } + + /** + * Tell the ProxyRouterProcessingState that the stream failed and trigger its chunk streaming error handling with an + * artificial exception. If the call had already succeeded previously then this will do nothing, but if it hasn't + * already succeeded then it's not going to (since the connection is closing) and doing this will cause any + * resources its holding onto to be released. + * + * @param proxyRouterState The state to cleanup. + */ + protected void releaseProxyRouterStateResources(ProxyRouterProcessingState proxyRouterState) { + if (proxyRouterState != null) { + proxyRouterState.setStreamingFailed(); + proxyRouterState.triggerStreamingChannelErrorForChunks( + new RuntimeException("Server worker channel closed") + ); + } + } } diff --git a/riposte-core/src/main/java/com/nike/riposte/server/handler/IdleChannelTimeoutHandler.java b/riposte-core/src/main/java/com/nike/riposte/server/handler/IdleChannelTimeoutHandler.java index eb9dab20..5f2ee59d 100644 --- a/riposte-core/src/main/java/com/nike/riposte/server/handler/IdleChannelTimeoutHandler.java +++ b/riposte-core/src/main/java/com/nike/riposte/server/handler/IdleChannelTimeoutHandler.java @@ -46,10 +46,12 @@ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws * log message. */ protected void channelIdleTriggered(ChannelHandlerContext ctx, IdleStateEvent evt) { - logger.debug( - "Closing server channel due to idle timeout. " - + "custom_handler_id={}, idle_timeout_millis={}, worker_channel_being_closed={}", - customHandlerIdForLogs, idleTimeoutMillis, ctx.channel().toString() - ); + if (logger.isDebugEnabled()) { + logger.debug( + "Closing server channel due to idle timeout. " + + "custom_handler_id={}, idle_timeout_millis={}, worker_channel_being_closed={}", + customHandlerIdForLogs, idleTimeoutMillis, ctx.channel().toString() + ); + } } } diff --git a/riposte-core/src/main/java/com/nike/riposte/server/handler/ProxyRouterEndpointExecutionHandler.java b/riposte-core/src/main/java/com/nike/riposte/server/handler/ProxyRouterEndpointExecutionHandler.java index 33607cea..92938455 100644 --- a/riposte-core/src/main/java/com/nike/riposte/server/handler/ProxyRouterEndpointExecutionHandler.java +++ b/riposte-core/src/main/java/com/nike/riposte/server/handler/ProxyRouterEndpointExecutionHandler.java @@ -237,7 +237,7 @@ else if (msg instanceof HttpContent) { // and we're done. If not, then we call registerChunkStreamingAction() to set up the // chunk-streaming behavior and subsequent cleanup for the given HttpContent. if (!releaseContentChunkIfStreamAlreadyFailed(msgContent, proxyRouterState)) { - registerChunkStreamingAction(proxyRouterState, msgContent, requestInfo, ctx); + registerChunkStreamingAction(proxyRouterState, msgContent, ctx); } } @@ -251,7 +251,6 @@ else if (msg instanceof HttpContent) { protected void registerChunkStreamingAction( ProxyRouterProcessingState proxyRouterState, HttpContent msgContent, - RequestInfo requestInfo, ChannelHandlerContext ctx ) { // We have a content chunk to stream downstream. Attach the chunk processing to the proxyRouterState and @@ -266,58 +265,50 @@ protected void registerChunkStreamingAction( if (cause == null) { // Nothing has blown up yet, so stream this next chunk downstream. Calling streamChunk() will decrement - // the chunk's reference count, allowing it to be destroyed since this should be the last handle - // on the chunk's memory. - ChannelFuture writeFuture; - try { - writeFuture = sc.streamChunk(msgContent); - } - catch(Throwable t) { - logger.error( - "StreamingAsyncHttpClient.streamChunk() threw an error. This should not be possible and " - + "indicates something went wrong with a Netty write() and flush(). If you see Netty memory " - + "leak warnings then this could be why. Please report this along with the stack trace to " - + "https://github.com/Nike-Inc/riposte/issues/new", - t - ); - return; - } - + // the chunk's reference count (at some point in the future), allowing it to be destroyed since + // this should be the last handle on the chunk's memory. + ChannelFuture writeFuture = sc.streamChunk(msgContent); writeFuture.addListener(future -> { // The chunk streaming is complete, one way or another. React appropriately if there was // a problem. if (!future.isSuccess()) { - String errorMsg = "Chunk streaming future came back as being unsuccessful."; - Throwable errorToFire = new WrapperException(errorMsg, future.cause()); - sc.closeChannelDueToUnrecoverableError(); - StreamingCallback callback = proxyRouterState.getStreamingCallback(); - if (callback != null) - callback.unrecoverableErrorOccurred(errorToFire); - else { - // We have to set proxyRouterState.setStreamingFailed() here since we couldn't - // call callback.unrecoverableErrorOccurred(...); - proxyRouterState.setStreamingFailed(); - runnableWithTracingAndMdc( - () -> logger.error( - "Unrecoverable error occurred and somehow the StreamingCallback was " - + "not available. This should not be possible. Firing the following " - + "error down the pipeline manually: " + errorMsg, - errorToFire), - ctx - ).run(); - executeOnlyIfChannelIsActive( - ctx, - "ProxyRouterEndpointExecutionHandler-streamchunk-writefuture-unsuccessful", - () -> ctx.fireExceptionCaught(errorToFire) - ); + try { + String errorMsg = "Chunk streaming ChannelFuture came back as being unsuccessful. " + + "downstream_channel_id=" + sc.getChannel().toString(); + Throwable errorToFire = new WrapperException(errorMsg, future.cause()); + StreamingCallback callback = proxyRouterState.getStreamingCallback(); + if (callback != null) + callback.unrecoverableErrorOccurred(errorToFire); + else { + // We have to set proxyRouterState.setStreamingFailed() here since we couldn't + // call callback.unrecoverableErrorOccurred(...); + proxyRouterState.setStreamingFailed(); + runnableWithTracingAndMdc( + () -> logger.error( + "Unrecoverable error occurred and somehow the StreamingCallback was " + + "not available. This should not be possible. Firing the following " + + "error down the pipeline manually: " + errorMsg, + errorToFire), + ctx + ).run(); + executeOnlyIfChannelIsActive( + ctx, + "ProxyRouterEndpointExecutionHandler-streamchunk-writefuture-unsuccessful", + () -> ctx.fireExceptionCaught(errorToFire) + ); + } + } + finally { + // Close down the StreamingChannel so its Channel can be released back to the pool. + sc.closeChannelDueToUnrecoverableError(future.cause()); } } }); } else { + StreamingChannel scToNotify = sc; try { // Something blew up while attempting to send a chunk to the downstream server. - StreamingChannel scToNotify = sc; if (scToNotify == null) { // No StreamingChannel from the registration future. Try to extract it from the // proxyRouterState directly if possible. @@ -329,34 +320,16 @@ protected void registerChunkStreamingAction( } catch (Throwable t) { runnableWithTracingAndMdc( - () -> logger.error("What? This should never happen. Swallowing.", t), ctx + () -> logger.error("What? This should never happen. Swallowing.", t), + ctx ).run(); } } } - if (scToNotify != null) { - scToNotify.closeChannelDueToUnrecoverableError(); - } - else { - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - Throwable actualCause = unwrapAsyncExceptions(cause); - if (!(actualCause instanceof WrapperException)) { - runnableWithTracingAndMdc( - () -> logger.error( - "Unable to extract StreamingChannel during error handling and the error " - + "that caused it was not a WrapperException, meaning " - + "StreamingAsyncHttpClient.streamDownstreamCall(...) did not properly " - + "handle it. This should likely never happen and might leave things in a " - + "bad state - it should be investigated and fixed! The error that caused " - + "this is: ", - cause), - ctx - ).run(); - } - } - - String errorMsg = "Chunk streaming future came back as being unsuccessful."; + String downstreamChannelId = (scToNotify == null) ? "UNKNOWN" : scToNotify.getChannel().toString(); + String errorMsg = "Chunk streaming future came back as being unsuccessful. " + + "downstream_channel_id=" + downstreamChannelId; Throwable errorToFire = new WrapperException(errorMsg, cause); StreamingCallback callback = proxyRouterState.getStreamingCallback(); @@ -382,6 +355,28 @@ protected void registerChunkStreamingAction( // dangling reference count handle that needs cleaning up. Since there's nothing left to // do with this chunk, we can release it now. msgContent.release(); + + // Close down the StreamingChannel so its Channel can be released back to the pool. + if (scToNotify != null) { + scToNotify.closeChannelDueToUnrecoverableError(cause); + } + else { + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + Throwable actualCause = unwrapAsyncExceptions(cause); + if (!(actualCause instanceof WrapperException)) { + runnableWithTracingAndMdc( + () -> logger.error( + "Unable to extract StreamingChannel during error handling and the error that " + + "caused it was not a WrapperException, meaning " + + "StreamingAsyncHttpClient.streamDownstreamCall(...) did not properly handle it. " + + "This should likely never happen and might leave things in a bad state - it " + + "should be investigated and fixed! The error that caused this is: ", + cause + ), + ctx + ).run(); + } + } } } }); diff --git a/riposte-core/src/test/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClientTest.java b/riposte-core/src/test/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClientTest.java index 76d6bc64..f8cc6370 100644 --- a/riposte-core/src/test/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClientTest.java +++ b/riposte-core/src/test/java/com/nike/riposte/client/asynchttp/netty/StreamingAsyncHttpClientTest.java @@ -2,108 +2,413 @@ import com.nike.riposte.client.asynchttp.netty.StreamingAsyncHttpClient.ObjectHolder; import com.nike.riposte.client.asynchttp.netty.StreamingAsyncHttpClient.StreamingChannel; +import com.nike.wingtips.Span; + +import com.tngtech.java.junit.dataprovider.DataProvider; +import com.tngtech.java.junit.dataprovider.DataProviderRunner; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import java.util.Deque; +import java.util.Map; + +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.pool.ChannelPool; import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.Attribute; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import static com.nike.riposte.client.asynchttp.netty.StreamingAsyncHttpClient.CHANNEL_IS_BROKEN_ATTR; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; /** * Tests the functionality of {@link StreamingAsyncHttpClient}. * * @author Nic Munroe */ +@RunWith(DataProviderRunner.class) public class StreamingAsyncHttpClientTest { private Channel channelMock; private ChannelPool channelPoolMock; + private EventLoop eventLoopMock; private ObjectHolder callActiveHolder; - private StreamingChannel streamingChannel; + private ObjectHolder downstreamLastChunkSentHolder; + private StreamingChannel streamingChannelSpy; private HttpContent contentChunkMock; - private ChannelFuture streamChunkChannelFutureMock; + private ChannelFuture writeAndFlushChannelFutureMock; private Attribute channelIsBrokenAttrMock; + private ChannelPromise streamChunkChannelPromiseMock; + + private ChannelFuture failedFutureMock; + @Before public void beforeMethod() { channelMock = mock(Channel.class); channelPoolMock = mock(ChannelPool.class); + eventLoopMock = mock(EventLoop.class); contentChunkMock = mock(HttpContent.class); callActiveHolder = new ObjectHolder<>(); callActiveHolder.heldObject = true; - streamingChannel = new StreamingChannel(channelMock, channelPoolMock, callActiveHolder); + downstreamLastChunkSentHolder = new ObjectHolder<>(); + downstreamLastChunkSentHolder.heldObject = false; - streamChunkChannelFutureMock = mock(ChannelFuture.class); + streamingChannelSpy = spy(new StreamingChannel(channelMock, channelPoolMock, callActiveHolder, + downstreamLastChunkSentHolder, null, null)); - doReturn(streamChunkChannelFutureMock).when(channelMock).writeAndFlush(contentChunkMock); + writeAndFlushChannelFutureMock = mock(ChannelFuture.class); + + doReturn(eventLoopMock).when(channelMock).eventLoop(); + + doReturn(writeAndFlushChannelFutureMock).when(channelMock).writeAndFlush(contentChunkMock); channelIsBrokenAttrMock = mock(Attribute.class); doReturn(channelIsBrokenAttrMock).when(channelMock).attr(CHANNEL_IS_BROKEN_ATTR); + + streamChunkChannelPromiseMock = mock(ChannelPromise.class); + doReturn(streamChunkChannelPromiseMock).when(channelMock).newPromise(); + + failedFutureMock = mock(ChannelFuture.class); + doReturn(failedFutureMock).when(channelMock).newFailedFuture(any(Throwable.class)); + } + + @Test + public void constructor_sets_fields_as_expected() { + // given + Deque spanStackMock = mock(Deque.class); + Map mdcInfoMock = mock(Map.class); + + // when + StreamingChannel sc = new StreamingChannel( + channelMock, channelPoolMock, callActiveHolder, downstreamLastChunkSentHolder, spanStackMock, mdcInfoMock + ); + + // then + assertThat(sc.channel).isSameAs(channelMock); + assertThat(sc.getChannel()).isSameAs(sc.channel); + assertThat(sc.pool).isSameAs(channelPoolMock); + assertThat(sc.callActiveHolder).isSameAs(callActiveHolder); + assertThat(sc.downstreamLastChunkSentHolder).isSameAs(downstreamLastChunkSentHolder); + assertThat(sc.distributedTracingSpanStack).isSameAs(spanStackMock); + assertThat(sc.distributedTracingMdcInfo).isSameAs(mdcInfoMock); + } + + @Test + public void StreamingChannel_streamChunk_sets_up_task_in_event_loop_to_call_doStreamChunk_and_adds_listener_to_complete_promise() + throws Exception { + // given + ChannelFuture doStreamChunkFutureMock = mock(ChannelFuture.class); + doReturn(doStreamChunkFutureMock).when(streamingChannelSpy).doStreamChunk(any(HttpContent.class)); + + // when + ChannelFuture result = streamingChannelSpy.streamChunk(contentChunkMock); + + // then + assertThat(result).isSameAs(streamChunkChannelPromiseMock); + verifyZeroInteractions(streamChunkChannelPromiseMock); // not yet completed + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventLoopMock).execute(taskCaptor.capture()); + Runnable task = taskCaptor.getValue(); + + // and given + verify(streamingChannelSpy, never()).doStreamChunk(any(HttpContent.class)); // not yet called + + // when + task.run(); + + // then + verify(streamingChannelSpy).doStreamChunk(contentChunkMock); + ArgumentCaptor listenerCaptor = ArgumentCaptor.forClass(GenericFutureListener.class); + verify(doStreamChunkFutureMock).addListener(listenerCaptor.capture()); + GenericFutureListener listener = listenerCaptor.getValue(); + assertThat(listener).isNotNull(); + + // and when + listener.operationComplete(getFutureForCase(true, false, null)); + + // then + verify(streamChunkChannelPromiseMock).cancel(true); + verifyNoMoreInteractions(streamChunkChannelPromiseMock); + + // and when + listener.operationComplete(getFutureForCase(false, true, null)); + + // then + verify(streamChunkChannelPromiseMock).setSuccess(); + verifyNoMoreInteractions(streamChunkChannelPromiseMock); + + // and when + Throwable normalFutureFailure = new RuntimeException("normal future failure"); + listener.operationComplete(getFutureForCase(false, false, normalFutureFailure)); + + // then + verify(streamChunkChannelPromiseMock).setFailure(normalFutureFailure); + verifyNoMoreInteractions(streamChunkChannelPromiseMock); + + // and when + reset(streamChunkChannelPromiseMock); + listener.operationComplete(getFutureForCase(false, false, null)); + + // then + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(streamChunkChannelPromiseMock).setFailure(throwableCaptor.capture()); + assertThat(throwableCaptor.getValue()).hasMessage("Received ChannelFuture that was in an impossible state"); + verifyNoMoreInteractions(streamChunkChannelPromiseMock); + } + + private Future getFutureForCase(boolean isCanceled, boolean isSuccess, Throwable failureCause) { + Future futureMock = mock(Future.class); + doReturn(isCanceled).when(futureMock).isCancelled(); + doReturn(isSuccess).when(futureMock).isSuccess(); + doReturn(failureCause).when(futureMock).cause(); + return futureMock; + } + + @Test + public void StreamingChannel_streamChunk_fails_promise_with_unexpected_exception() { + // given + Throwable crazyEx = new RuntimeException("kaboom"); + doThrow(crazyEx).when(eventLoopMock).execute(any(Runnable.class)); + + // when + ChannelFuture result = streamingChannelSpy.streamChunk(contentChunkMock); + + // then + verifyFailedChannelFuture(result, "StreamingChannel.streamChunk() threw an exception", crazyEx); + } + + private void verifyFailedChannelFuture(ChannelFuture result, + String expectedExceptionMessagePrefix, + Throwable expectedExCause) { + assertThat(result).isSameAs(failedFutureMock); + ArgumentCaptor exCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(channelMock).newFailedFuture(exCaptor.capture()); + Throwable exThatFailedTheFuture = exCaptor.getValue(); + assertThat(exThatFailedTheFuture).hasMessageStartingWith(expectedExceptionMessagePrefix); + if (expectedExCause != null) + assertThat(exThatFailedTheFuture).hasCause(expectedExCause); } @Test - public void StreamingChannel_streamChunk_works_as_expected_for_normal_case() { + public void StreamingChannel_doStreamChunk_works_as_expected_when_last_chunk_already_sent_downstream_and_incoming_chunk_is_empty_last_chunk() { + // given + streamingChannelSpy.downstreamLastChunkSentHolder.heldObject = true; + + LastHttpContent contentChunkMock = mock(LastHttpContent.class); + ByteBuf contentByteBufMock = mock(ByteBuf.class); + doReturn(contentByteBufMock).when(contentChunkMock).content(); + doReturn(0).when(contentByteBufMock).readableBytes(); + + ChannelFuture successFutureMock = mock(ChannelFuture.class); + doReturn(successFutureMock).when(channelMock).newSucceededFuture(); + // when - ChannelFuture result = streamingChannel.streamChunk(contentChunkMock); + ChannelFuture result = streamingChannelSpy.doStreamChunk(contentChunkMock); + + // then + verify(channelMock, never()).writeAndFlush(any(Object.class)); + verify(contentChunkMock).release(); + verify(channelMock).newSucceededFuture(); + assertThat(result).isSameAs(successFutureMock); + } + + @DataProvider(value = { + "false | 0", + "true | 42" + }, splitBy = "\\|") + @Test + public void StreamingChannel_doStreamChunk_works_as_expected_when_last_chunk_already_sent_downstream_and_incoming_chunk_does_not_match_requirements( + boolean chunkIsLastChunk, int readableBytes + ) { + // given + streamingChannelSpy.downstreamLastChunkSentHolder.heldObject = true; + streamingChannelSpy.callActiveHolder.heldObject = true; + streamingChannelSpy.channelClosedDueToUnrecoverableError = false; + + HttpContent contentChunkMock = (chunkIsLastChunk) ? mock(LastHttpContent.class) : mock(HttpContent.class); + ByteBuf contentByteBufMock = mock(ByteBuf.class); + doReturn(contentByteBufMock).when(contentChunkMock).content(); + doReturn(readableBytes).when(contentByteBufMock).readableBytes(); + + doReturn(writeAndFlushChannelFutureMock).when(channelMock).writeAndFlush(contentChunkMock); + + // when + ChannelFuture result = streamingChannelSpy.doStreamChunk(contentChunkMock); // then verify(channelMock).writeAndFlush(contentChunkMock); - assertThat(result).isSameAs(streamChunkChannelFutureMock); + assertThat(result).isSameAs(writeAndFlushChannelFutureMock); + } + + @Test + public void StreamingChannel_doStreamChunk_works_as_expected_when_downstream_call_is_not_active() { + // given + streamingChannelSpy.callActiveHolder.heldObject = false; + + // when + ChannelFuture result = streamingChannelSpy.doStreamChunk(contentChunkMock); + + // then + verify(channelMock, never()).writeAndFlush(any(Object.class)); + verify(contentChunkMock).release(); + + verifyFailedChannelFuture( + result, "Unable to stream chunk - downstream call is no longer active.", null + ); } @Test - public void StreamingChannel_streamChunk_works_as_expected_when_closeChannelDueToUnrecoverableError_was_called_previously() { + public void StreamingChannel_doStreamChunk_works_as_expected_when_closeChannelDueToUnrecoverableError_was_called_previously() { // given - ChannelFuture failedChannelFutureMock = mock(ChannelFuture.class); - doReturn(failedChannelFutureMock).when(channelMock).newFailedFuture(any(Throwable.class)); - streamingChannel.closeChannelDueToUnrecoverableError(); + streamingChannelSpy.channelClosedDueToUnrecoverableError = true; // when - ChannelFuture result = streamingChannel.streamChunk(contentChunkMock); + ChannelFuture result = streamingChannelSpy.doStreamChunk(contentChunkMock); // then verify(channelMock, never()).writeAndFlush(any(Object.class)); - assertThat(result).isSameAs(failedChannelFutureMock); verify(contentChunkMock).release(); - ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); - verify(channelMock).newFailedFuture(exceptionCaptor.capture()); - assertThat(exceptionCaptor.getValue()) - .hasMessage("Unable to stream chunks downstream - " - + "the channel was closed previously due to an unrecoverable error" - ); + verifyFailedChannelFuture( + result, "Unable to stream chunks downstream - the channel was closed previously due to an unrecoverable error", null + ); } @Test - public void StreamingChannel_closeChannelDueToUnrecoverableError_works_as_expected() { + public void StreamingChannel_doStreamChunk_works_as_expected_when_crazy_exception_is_thrown() { // given - assertThat(streamingChannel.channelClosedDueToUnrecoverableError).isFalse(); - assertThat(streamingChannel.callActiveHolder.heldObject).isTrue(); + Throwable crazyEx = new RuntimeException("kaboom"); + doThrow(crazyEx).when(channelMock).writeAndFlush(any(Object.class)); // when - streamingChannel.closeChannelDueToUnrecoverableError(); + ChannelFuture result = streamingChannelSpy.doStreamChunk(contentChunkMock); // then - assertThat(streamingChannel.channelClosedDueToUnrecoverableError).isTrue(); - verify(channelIsBrokenAttrMock).set(true); - verifyChannelReleasedBackToPool(streamingChannel.callActiveHolder, channelPoolMock, channelMock); - verify(channelMock).close(); + verify(channelMock).writeAndFlush(any(Object.class)); + verify(contentChunkMock, never()).release(); + + verifyFailedChannelFuture( + result, "StreamingChannel.doStreamChunk() threw an exception", crazyEx + ); + } + + @Test + public void StreamingChannel_closeChannelDueToUnrecoverableError_calls_the_do_method_and_sets_field_to_true_when_not_closed_and_call_active() { + // given + Throwable unrecoverableError = new RuntimeException("kaboom"); + streamingChannelSpy.channelClosedDueToUnrecoverableError = false; + streamingChannelSpy.callActiveHolder.heldObject = true; + + // when + streamingChannelSpy.closeChannelDueToUnrecoverableError(unrecoverableError); + + // then + assertThat(streamingChannelSpy.channelClosedDueToUnrecoverableError).isTrue(); + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventLoopMock).execute(taskCaptor.capture()); + Runnable task = taskCaptor.getValue(); + assertThat(task).isNotNull(); + + // and given + verify(streamingChannelSpy, never()).doCloseChannelDueToUnrecoverableError(any(Throwable.class)); + + // when + task.run(); + + // then + verify(streamingChannelSpy).doCloseChannelDueToUnrecoverableError(unrecoverableError); + } + + @DataProvider(value = { + "true | true", + "false | false", + "true | false" + }, splitBy = "\\|") + @Test + public void StreamingChannel_closeChannelDueToUnrecoverableError_sets_field_to_true_but_otherwise_does_nothing_if_already_closed_or_call_inactive( + boolean alreadyClosed, boolean callActive + ) { + // given + Throwable unrecoverableError = new RuntimeException("kaboom"); + streamingChannelSpy.channelClosedDueToUnrecoverableError = alreadyClosed; + streamingChannelSpy.callActiveHolder.heldObject = callActive; + + // when + streamingChannelSpy.closeChannelDueToUnrecoverableError(unrecoverableError); + + // then + assertThat(streamingChannelSpy.channelClosedDueToUnrecoverableError).isTrue(); + verifyZeroInteractions(channelMock); + } + + @Test + public void StreamingChannel_closeChannelDueToUnrecoverableError_sets_field_to_true_even_if_crazy_exception_occurs() { + // given + streamingChannelSpy.channelClosedDueToUnrecoverableError = false; + streamingChannelSpy.callActiveHolder.heldObject = true; + Throwable crazyEx = new RuntimeException("kaboom"); + doThrow(crazyEx).when(channelMock).eventLoop(); + + // when + Throwable caughtEx = catchThrowable( + () -> streamingChannelSpy.closeChannelDueToUnrecoverableError(new RuntimeException("some other error")) + ); + + // then + assertThat(caughtEx).isSameAs(crazyEx); + assertThat(streamingChannelSpy.channelClosedDueToUnrecoverableError).isTrue(); + } + + @DataProvider(value = { + "true", + "false" + }) + @Test + public void StreamingChannel_doCloseChannelDueToUnrecoverableError_works_as_expected(boolean callActive) { + // given + streamingChannelSpy.callActiveHolder.heldObject = callActive; + Throwable unrecoverableError = new RuntimeException("kaboom"); + + // when + streamingChannelSpy.doCloseChannelDueToUnrecoverableError(unrecoverableError); + + // then + if (callActive) { + verify(channelIsBrokenAttrMock).set(true); + verifyChannelReleasedBackToPool(streamingChannelSpy.callActiveHolder, channelPoolMock, channelMock); + verify(channelMock).close(); + } + else { + verify(channelIsBrokenAttrMock, never()).set(anyBoolean()); + verify(channelMock, never()).close(); + } + } private void verifyChannelReleasedBackToPool(ObjectHolder callActiveHolder, diff --git a/riposte-core/src/test/java/com/nike/riposte/server/componenttest/VerifyResponseHttpStatusCodeHandlingRfcCorrectnessComponentTest.java b/riposte-core/src/test/java/com/nike/riposte/server/componenttest/VerifyResponseHttpStatusCodeHandlingRfcCorrectnessComponentTest.java index 6b2607da..dc4da32b 100644 --- a/riposte-core/src/test/java/com/nike/riposte/server/componenttest/VerifyResponseHttpStatusCodeHandlingRfcCorrectnessComponentTest.java +++ b/riposte-core/src/test/java/com/nike/riposte/server/componenttest/VerifyResponseHttpStatusCodeHandlingRfcCorrectnessComponentTest.java @@ -19,6 +19,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import uk.org.lidalia.slf4jext.Level; import uk.org.lidalia.slf4jtest.TestLoggerFactory; @@ -34,9 +36,11 @@ import java.util.stream.IntStream; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.restassured.response.ExtractableResponse; +import static com.nike.riposte.server.componenttest.VerifyResponseHttpStatusCodeHandlingRfcCorrectnessComponentTest.BackendEndpoint.CALL_ID_RESPONSE_HEADER_KEY; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; import static io.netty.handler.codec.http.HttpHeaders.Names.TRANSFER_ENCODING; import static io.netty.handler.codec.http.HttpHeaders.Values.CHUNKED; @@ -55,6 +59,8 @@ @RunWith(DataProviderRunner.class) public class VerifyResponseHttpStatusCodeHandlingRfcCorrectnessComponentTest { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static Server backendServer; private static ServerConfig backendServerConfig; @@ -147,6 +153,8 @@ private boolean isContentAlwaysEmptyStatusCode(int statusCode) { @UseDataProvider("responseStatusCodeScenariosDataProvider") public void verify_response_status_code_scenarios(int desiredStatusCode, boolean shouldReturnEmptyPayload) { for (int i = 0; i < 3; i++) { // Run this scenario 3 times in quick succession to catch potential keep-alive connection pooling issues. + logger.info("=== RUN " + i + " ==="); + String callId = UUID.randomUUID().toString(); ExtractableResponse response = given() .config(config().redirect(redirectConfig().followRedirects(false))) .baseUri("http://localhost") @@ -154,12 +162,14 @@ public void verify_response_status_code_scenarios(int desiredStatusCode, boolean .basePath(BackendEndpoint.MATCHING_PATH) .header(BackendEndpoint.DESIRED_RESPONSE_HTTP_STATUS_CODE_HEADER_KEY, String.valueOf(desiredStatusCode)) .header(BackendEndpoint.SHOULD_RETURN_EMPTY_PAYLOAD_BODY_HEADER_KEY, String.valueOf(shouldReturnEmptyPayload)) + .header(BackendEndpoint.CALL_ID_REQUEST_HEADER_KEY, callId) .when() .get() .then() .extract(); assertThat(response.statusCode()).isEqualTo(desiredStatusCode); + assertThat(response.header(CALL_ID_RESPONSE_HEADER_KEY)).isEqualTo(callId); if (isContentAlwaysEmptyStatusCode(desiredStatusCode)) { assertThat(response.asString()).isNullOrEmpty(); assertThat(response.header(CONTENT_LENGTH)).isEqualTo("0"); @@ -171,8 +181,9 @@ public void verify_response_status_code_scenarios(int desiredStatusCode, boolean if (shouldReturnEmptyPayload) assertThat(response.asString()).isNullOrEmpty(); else - assertThat(response.asString()).isEqualTo(BackendEndpoint.NON_EMPTY_PAYLOAD); + assertThat(response.asString()).isEqualTo(callId + BackendEndpoint.NON_EMPTY_PAYLOAD); } + logger.info("=== END RUN " + i + " ==="); } } @@ -240,16 +251,23 @@ public static class BackendEndpoint extends StandardEndpoint { public static final String MATCHING_PATH = "/backendEndpoint"; public static final String DESIRED_RESPONSE_HTTP_STATUS_CODE_HEADER_KEY = "desiredResponseHttpStatusCode"; public static final String SHOULD_RETURN_EMPTY_PAYLOAD_BODY_HEADER_KEY = "shouldReturnEmptyPayloadBody"; + public static final String CALL_ID_REQUEST_HEADER_KEY = "callId"; + public static final String CALL_ID_RESPONSE_HEADER_KEY = "callId-received"; public static final String NON_EMPTY_PAYLOAD = UUID.randomUUID().toString(); @Override public CompletableFuture> execute(RequestInfo request, Executor longRunningTaskExecutor, ChannelHandlerContext ctx) { int statusCode = Integer.parseInt(request.getHeaders().get(DESIRED_RESPONSE_HTTP_STATUS_CODE_HEADER_KEY)); boolean returnEmptyPayload = "true".equals(request.getHeaders().get(SHOULD_RETURN_EMPTY_PAYLOAD_BODY_HEADER_KEY)); - String returnPayload = (returnEmptyPayload) ? null : NON_EMPTY_PAYLOAD; + String callIdReceived = String.valueOf(request.getHeaders().get(CALL_ID_REQUEST_HEADER_KEY)); + String returnPayload = (returnEmptyPayload) ? null : callIdReceived + NON_EMPTY_PAYLOAD; return CompletableFuture.completedFuture( - ResponseInfo.newBuilder(returnPayload).withHttpStatusCode(statusCode).withDesiredContentWriterMimeType("text/plain").build() + ResponseInfo.newBuilder(returnPayload) + .withHttpStatusCode(statusCode) + .withDesiredContentWriterMimeType("text/plain") + .withHeaders(new DefaultHttpHeaders().set(CALL_ID_RESPONSE_HEADER_KEY, callIdReceived)) + .build() ); } diff --git a/riposte-core/src/test/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandlerTest.java b/riposte-core/src/test/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandlerTest.java index 58e14c70..1afcd5a2 100644 --- a/riposte-core/src/test/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandlerTest.java +++ b/riposte-core/src/test/java/com/nike/riposte/server/handler/ChannelPipelineFinalizerHandlerTest.java @@ -531,6 +531,14 @@ public void doChannelInactive_does_not_explode_if_crazy_exception_occurs() throw Assertions.assertThat(result).isEqualTo(PipelineContinuationBehavior.CONTINUE); } + @Test + public void argsAreEligibleForLinkingAndUnlinkingDistributedTracingInfo_returns_false() { + // expect + Assertions.assertThat( + handler.argsAreEligibleForLinkingAndUnlinkingDistributedTracingInfo(null, null, null, null) + ).isFalse(); + } + @Test public void code_coverage_hoops() throws Exception { // jump!