Skip to content

Commit

Permalink
Fix race conditions and Netty HttpClientCodec subtleties related to P…
Browse files Browse the repository at this point in the history
…roxyRouterEndpoints
  • Loading branch information
nicmunroe committed Apr 20, 2017
1 parent 5ed417e commit bab2d39
Show file tree
Hide file tree
Showing 8 changed files with 735 additions and 162 deletions.
12 changes: 12 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Controls the behavior of the codecov.io integration.
# See https://github.com/codecov/support/wiki/Codecov-Yaml for details on the options.
coverage:
status:
project:
default:
enabled: yes
target: 85%
patch:
default:
enabled: yes
target: 90%

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ protected void finalizeChannelPipeline(ChannelHandlerContext ctx, Object msg, Ht
// If we're in an error case (cause != null) and the response sending has started but not completed, then this
// request is broken. We can't do anything except kill the channel.
if ((cause != null) && state.isResponseSendingStarted() && !state.isResponseSendingLastChunkSent()) {
runnableWithTracingAndMdc(
() -> logger.error(
"Received an error in ChannelPipelineFinalizerHandler after response sending was started, but "
+ "before it finished. Closing the channel. unexpected_error={}", cause.toString()
),
ctx
).run();
ctx.channel().close();
}
}
Expand All @@ -221,8 +228,25 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)
ProxyRouterProcessingState proxyRouterState =
ChannelAttributes.getProxyRouterProcessingStateForChannel(ctx).get();

RequestInfo<?> requestInfo = (httpState == null) ? null : httpState.getRequestInfo();
ResponseInfo<?> responseInfo = (httpState == null) ? null : httpState.getResponseInfo();
if (httpState == null) {
if (proxyRouterState == null) {
logger.debug("This channel closed before it processed any requests. Nothing to cleanup. "
+ "current_span={}", Tracer.getInstance().getCurrentSpan());
}
else {
// This should never happen, but if it does we'll try to release what we can and then return.
logger.error("Found a channel where HttpProcessingState was null, but ProxyRouterProcessingState "
+ "was not null. This should not be possible! "
+ "current_span={}", Tracer.getInstance().getCurrentSpan());
releaseProxyRouterStateResources(proxyRouterState);
}

// With httpState null, there's nothing left for us to do.
return PipelineContinuationBehavior.CONTINUE;
}

RequestInfo<?> requestInfo = httpState.getRequestInfo();
ResponseInfo<?> responseInfo = httpState.getResponseInfo();

if (logger.isDebugEnabled()) {
runnableWithTracingAndMdc(
Expand All @@ -238,7 +262,7 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)
// debug investigations can find out what happened.
// TODO: Is there a way we can handle access logging and/or metrics here, but only if it wasn't done elsewhere?
@SuppressWarnings("SimplifiableConditionalExpression")
boolean tracingAlreadyCompleted = (httpState == null) ? true : httpState.isTraceCompletedOrScheduled();
boolean tracingAlreadyCompleted = httpState.isTraceCompletedOrScheduled();
boolean responseNotFullySent = responseInfo == null || !responseInfo.isResponseSendingLastChunkSent();
if (responseNotFullySent || !tracingAlreadyCompleted) {
runnableWithTracingAndMdc(
Expand All @@ -248,7 +272,8 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)
"The caller's channel was closed before a response could be sent. This means that "
+ "an access log probably does not exist for this request. Distributed tracing "
+ "will be completed now if it wasn't already done. Any dangling resources will be "
+ "released."
+ "released. response_info_is_null={}",
(responseInfo == null)
);
}

Expand All @@ -268,16 +293,7 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)
if (requestInfo != null)
requestInfo.releaseAllResources();

// Tell the ProxyRouterProcessingState that the stream failed and trigger its chunk streaming error handling
// with an artificial exception. If the call had already succeeded previously then this will do
// nothing, but if it hasn't already succeeded then it's not going to (since the connection is closing)
// and doing this will cause any resources its holding onto to be released.
if (proxyRouterState != null) {
proxyRouterState.setStreamingFailed();
proxyRouterState.triggerStreamingChannelErrorForChunks(
new RuntimeException("Server worker channel closed")
);
}
releaseProxyRouterStateResources(proxyRouterState);
}
catch(Throwable t) {
runnableWithTracingAndMdc(
Expand All @@ -290,4 +306,21 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)

return PipelineContinuationBehavior.CONTINUE;
}

/**
* Tell the ProxyRouterProcessingState that the stream failed and trigger its chunk streaming error handling with an
* artificial exception. If the call had already succeeded previously then this will do nothing, but if it hasn't
* already succeeded then it's not going to (since the connection is closing) and doing this will cause any
* resources its holding onto to be released.
*
* @param proxyRouterState The state to cleanup.
*/
protected void releaseProxyRouterStateResources(ProxyRouterProcessingState proxyRouterState) {
if (proxyRouterState != null) {
proxyRouterState.setStreamingFailed();
proxyRouterState.triggerStreamingChannelErrorForChunks(
new RuntimeException("Server worker channel closed")
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws
* log message.
*/
protected void channelIdleTriggered(ChannelHandlerContext ctx, IdleStateEvent evt) {
logger.debug(
"Closing server channel due to idle timeout. "
+ "custom_handler_id={}, idle_timeout_millis={}, worker_channel_being_closed={}",
customHandlerIdForLogs, idleTimeoutMillis, ctx.channel().toString()
);
if (logger.isDebugEnabled()) {
logger.debug(
"Closing server channel due to idle timeout. "
+ "custom_handler_id={}, idle_timeout_millis={}, worker_channel_being_closed={}",
customHandlerIdForLogs, idleTimeoutMillis, ctx.channel().toString()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ else if (msg instanceof HttpContent) {
// and we're done. If not, then we call registerChunkStreamingAction() to set up the
// chunk-streaming behavior and subsequent cleanup for the given HttpContent.
if (!releaseContentChunkIfStreamAlreadyFailed(msgContent, proxyRouterState)) {
registerChunkStreamingAction(proxyRouterState, msgContent, requestInfo, ctx);
registerChunkStreamingAction(proxyRouterState, msgContent, ctx);
}

}
Expand All @@ -251,7 +251,6 @@ else if (msg instanceof HttpContent) {
protected void registerChunkStreamingAction(
ProxyRouterProcessingState proxyRouterState,
HttpContent msgContent,
RequestInfo<?> requestInfo,
ChannelHandlerContext ctx
) {
// We have a content chunk to stream downstream. Attach the chunk processing to the proxyRouterState and
Expand All @@ -266,58 +265,50 @@ protected void registerChunkStreamingAction(

if (cause == null) {
// Nothing has blown up yet, so stream this next chunk downstream. Calling streamChunk() will decrement
// the chunk's reference count, allowing it to be destroyed since this should be the last handle
// on the chunk's memory.
ChannelFuture writeFuture;
try {
writeFuture = sc.streamChunk(msgContent);
}
catch(Throwable t) {
logger.error(
"StreamingAsyncHttpClient.streamChunk() threw an error. This should not be possible and "
+ "indicates something went wrong with a Netty write() and flush(). If you see Netty memory "
+ "leak warnings then this could be why. Please report this along with the stack trace to "
+ "https://github.com/Nike-Inc/riposte/issues/new",
t
);
return;
}

// the chunk's reference count (at some point in the future), allowing it to be destroyed since
// this should be the last handle on the chunk's memory.
ChannelFuture writeFuture = sc.streamChunk(msgContent);
writeFuture.addListener(future -> {
// The chunk streaming is complete, one way or another. React appropriately if there was
// a problem.
if (!future.isSuccess()) {
String errorMsg = "Chunk streaming future came back as being unsuccessful.";
Throwable errorToFire = new WrapperException(errorMsg, future.cause());
sc.closeChannelDueToUnrecoverableError();
StreamingCallback callback = proxyRouterState.getStreamingCallback();
if (callback != null)
callback.unrecoverableErrorOccurred(errorToFire);
else {
// We have to set proxyRouterState.setStreamingFailed() here since we couldn't
// call callback.unrecoverableErrorOccurred(...);
proxyRouterState.setStreamingFailed();
runnableWithTracingAndMdc(
() -> logger.error(
"Unrecoverable error occurred and somehow the StreamingCallback was "
+ "not available. This should not be possible. Firing the following "
+ "error down the pipeline manually: " + errorMsg,
errorToFire),
ctx
).run();
executeOnlyIfChannelIsActive(
ctx,
"ProxyRouterEndpointExecutionHandler-streamchunk-writefuture-unsuccessful",
() -> ctx.fireExceptionCaught(errorToFire)
);
try {
String errorMsg = "Chunk streaming ChannelFuture came back as being unsuccessful. "
+ "downstream_channel_id=" + sc.getChannel().toString();
Throwable errorToFire = new WrapperException(errorMsg, future.cause());
StreamingCallback callback = proxyRouterState.getStreamingCallback();
if (callback != null)
callback.unrecoverableErrorOccurred(errorToFire);
else {
// We have to set proxyRouterState.setStreamingFailed() here since we couldn't
// call callback.unrecoverableErrorOccurred(...);
proxyRouterState.setStreamingFailed();
runnableWithTracingAndMdc(
() -> logger.error(
"Unrecoverable error occurred and somehow the StreamingCallback was "
+ "not available. This should not be possible. Firing the following "
+ "error down the pipeline manually: " + errorMsg,
errorToFire),
ctx
).run();
executeOnlyIfChannelIsActive(
ctx,
"ProxyRouterEndpointExecutionHandler-streamchunk-writefuture-unsuccessful",
() -> ctx.fireExceptionCaught(errorToFire)
);
}
}
finally {
// Close down the StreamingChannel so its Channel can be released back to the pool.
sc.closeChannelDueToUnrecoverableError(future.cause());
}
}
});
}
else {
StreamingChannel scToNotify = sc;
try {
// Something blew up while attempting to send a chunk to the downstream server.
StreamingChannel scToNotify = sc;
if (scToNotify == null) {
// No StreamingChannel from the registration future. Try to extract it from the
// proxyRouterState directly if possible.
Expand All @@ -329,34 +320,16 @@ protected void registerChunkStreamingAction(
}
catch (Throwable t) {
runnableWithTracingAndMdc(
() -> logger.error("What? This should never happen. Swallowing.", t), ctx
() -> logger.error("What? This should never happen. Swallowing.", t),
ctx
).run();
}
}
}

if (scToNotify != null) {
scToNotify.closeChannelDueToUnrecoverableError();
}
else {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
Throwable actualCause = unwrapAsyncExceptions(cause);
if (!(actualCause instanceof WrapperException)) {
runnableWithTracingAndMdc(
() -> logger.error(
"Unable to extract StreamingChannel during error handling and the error "
+ "that caused it was not a WrapperException, meaning "
+ "StreamingAsyncHttpClient.streamDownstreamCall(...) did not properly "
+ "handle it. This should likely never happen and might leave things in a "
+ "bad state - it should be investigated and fixed! The error that caused "
+ "this is: ",
cause),
ctx
).run();
}
}

String errorMsg = "Chunk streaming future came back as being unsuccessful.";
String downstreamChannelId = (scToNotify == null) ? "UNKNOWN" : scToNotify.getChannel().toString();
String errorMsg = "Chunk streaming future came back as being unsuccessful. "
+ "downstream_channel_id=" + downstreamChannelId;
Throwable errorToFire = new WrapperException(errorMsg, cause);

StreamingCallback callback = proxyRouterState.getStreamingCallback();
Expand All @@ -382,6 +355,28 @@ protected void registerChunkStreamingAction(
// dangling reference count handle that needs cleaning up. Since there's nothing left to
// do with this chunk, we can release it now.
msgContent.release();

// Close down the StreamingChannel so its Channel can be released back to the pool.
if (scToNotify != null) {
scToNotify.closeChannelDueToUnrecoverableError(cause);
}
else {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
Throwable actualCause = unwrapAsyncExceptions(cause);
if (!(actualCause instanceof WrapperException)) {
runnableWithTracingAndMdc(
() -> logger.error(
"Unable to extract StreamingChannel during error handling and the error that "
+ "caused it was not a WrapperException, meaning "
+ "StreamingAsyncHttpClient.streamDownstreamCall(...) did not properly handle it. "
+ "This should likely never happen and might leave things in a bad state - it "
+ "should be investigated and fixed! The error that caused this is: ",
cause
),
ctx
).run();
}
}
}
}
});
Expand Down
Loading

0 comments on commit bab2d39

Please sign in to comment.