Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/access logger corner cases #93

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public void initChannel(SocketChannel ch) {
p.addLast(
CHANNEL_PIPELINE_FINALIZER_HANDLER_NAME,
new ChannelPipelineFinalizerHandler(
exceptionHandlingHandler, responseSender, metricsListener, workerChannelIdleTimeoutMillis
exceptionHandlingHandler, responseSender, metricsListener, accessLogger, workerChannelIdleTimeoutMillis
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpResponse;
Expand Down Expand Up @@ -57,15 +55,21 @@ protected void doAccessLogging(ChannelHandlerContext ctx) throws Exception {
// Due to multiple messages and exception possibilities/interactions it's possible we've already done the access
// logging for this request, so make sure we only do it if appropriate
if (httpProcessingState != null && !httpProcessingState.isAccessLogCompletedOrScheduled()) {
Instant startTime = httpProcessingState.getRequestStartTime();
httpProcessingState.setAccessLogCompletedOrScheduled(true);

// Response end time should already be set by now under normal circumstances,
// but just in case it hasn't (i.e. exception corner cases)...
httpProcessingState.setResponseEndTimeNanosToNowIfNotAlreadySet();

// Gather the remaining bits needed to execute the access logger.
ResponseInfo responseInfo = httpProcessingState.getResponseInfo();
HttpResponse actualResponseObject = httpProcessingState.getActualResponseObject();
RequestInfo requestInfo = httpProcessingState.getRequestInfo();

ChannelFutureListener doTheAccessLoggingOperation = new ChannelFutureListenerWithTracingAndMdc(
(channelFuture) -> accessLogger.log(
requestInfo, actualResponseObject, responseInfo,
Instant.now().minusMillis(startTime.toEpochMilli()).toEpochMilli()
httpProcessingState.calculateTotalRequestTimeMillis()
),
ctx
);
Expand All @@ -76,8 +80,6 @@ protected void doAccessLogging(ChannelHandlerContext ctx) throws Exception {
doTheAccessLoggingOperation.operationComplete(null);
else
httpProcessingState.getResponseWriterFinalChunkChannelFuture().addListener(doTheAccessLoggingOperation);

httpProcessingState.setAccessLogCompletedOrScheduled(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;

import static com.nike.riposte.util.AsyncNettyHelper.runnableWithTracingAndMdc;

Expand All @@ -33,15 +34,30 @@ public class AccessLogStartHandler extends BaseInboundHandlerWithTracingAndMdcSu
private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public PipelineContinuationBehavior doChannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public PipelineContinuationBehavior doChannelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpProcessingState httpProcessingState = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
if (httpProcessingState != null) {
httpProcessingState.setRequestStartTime(Instant.now());
httpProcessingState.setRequestStartTimeNanos(System.nanoTime());
}
else {
runnableWithTracingAndMdc(
() -> logger.warn("HttpProcessingState is null. This shouldn't happen."), ctx
() -> logger.warn("HttpProcessingState is null reading HttpRequest. This shouldn't happen."),
ctx
).run();
}
}

if (msg instanceof LastHttpContent) {
HttpProcessingState httpProcessingState = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
if (httpProcessingState != null) {
httpProcessingState.setRequestLastChunkArrivedTimeNanos(System.nanoTime());
}
else {
runnableWithTracingAndMdc(
() -> logger.warn("HttpProcessingState is null reading LastHttpContent. This shouldn't happen."),
ctx
).run();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.nike.riposte.server.http.RequestInfo;
import com.nike.riposte.server.http.ResponseInfo;
import com.nike.riposte.server.http.ResponseSender;
import com.nike.riposte.server.http.impl.RequestInfoImpl;
import com.nike.riposte.server.logging.AccessLogger;
import com.nike.riposte.server.metrics.ServerMetricsEvent;
import com.nike.wingtips.Span;
import com.nike.wingtips.Tracer;
Expand Down Expand Up @@ -48,6 +50,7 @@ public class ChannelPipelineFinalizerHandler extends BaseInboundHandlerWithTraci
private final ExceptionHandlingHandler exceptionHandlingHandler;
private final ResponseSender responseSender;
private final MetricsListener metricsListener;
private final AccessLogger accessLogger;
private final long workerChannelIdleTimeoutMillis;
private static final Throwable ARTIFICIAL_SERVER_WORKER_CHANNEL_CLOSED_EXCEPTION =
new RuntimeException("Server worker channel closed");
Expand All @@ -59,13 +62,17 @@ public class ChannelPipelineFinalizerHandler extends BaseInboundHandlerWithTraci
* @param responseSender
* The {@link ResponseSender} that is used by the pipeline where this class is registered for sending data to
* the user.
* @param accessLogger The {@link AccessLogger} that is used by the pipeline where this class is registered for
* access logging (i.e. the same access logger set on {@link AccessLogEndHandler}).
* @param workerChannelIdleTimeoutMillis
* The time in millis that should be given to {@link IdleChannelTimeoutHandler}s when they are created for
* detecting idle channels that need to be closed.
*/
public ChannelPipelineFinalizerHandler(ExceptionHandlingHandler exceptionHandlingHandler,
ResponseSender responseSender,
MetricsListener metricsListener, long workerChannelIdleTimeoutMillis) {
MetricsListener metricsListener,
AccessLogger accessLogger,
long workerChannelIdleTimeoutMillis) {
if (exceptionHandlingHandler == null)
throw new IllegalArgumentException("exceptionHandlingHandler cannot be null");

Expand All @@ -75,6 +82,7 @@ public ChannelPipelineFinalizerHandler(ExceptionHandlingHandler exceptionHandlin
this.exceptionHandlingHandler = exceptionHandlingHandler;
this.responseSender = responseSender;
this.metricsListener = metricsListener;
this.accessLogger = accessLogger;
this.workerChannelIdleTimeoutMillis = workerChannelIdleTimeoutMillis;
}

Expand Down Expand Up @@ -262,11 +270,14 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)
).run();
}

// The request/response is definitely done at this point since the channel is closing. Set the response end
// time if it hasn't already been done.
httpState.setResponseEndTimeNanosToNowIfNotAlreadySet();

// Handle the case where the response wasn't fully sent or tracing wasn't completed for some reason.
// We want to finish the distributed tracing span for this request since there's no other place it
// might be done, and if the request wasn't fully sent then we should spit out a log message so
// debug investigations can find out what happened.
// TODO: Is there a way we can handle access logging here, but only if it wasn't done elsewhere? Maybe similar to what we're doing with metrics?
@SuppressWarnings("SimplifiableConditionalExpression")
boolean tracingAlreadyCompleted = httpState.isTraceCompletedOrScheduled();
boolean responseNotFullySent = responseInfo == null || !responseInfo.isResponseSendingLastChunkSent();
Expand All @@ -275,26 +286,39 @@ public PipelineContinuationBehavior doChannelInactive(ChannelHandlerContext ctx)
() -> {
if (responseNotFullySent) {
logger.warn(
"The caller's channel was closed before a response could be sent. This means that "
+ "an access log probably does not exist for this request. Distributed tracing "
+ "will be completed now if it wasn't already done. Any dangling resources will be "
+ "released. response_info_is_null={}",
"The caller's channel was closed before a response could be sent. Distributed tracing "
+ "will be completed now if it wasn't already done, and we will attempt to output an "
+ "access log if needed. Any dangling resources will be released. "
+ "response_info_is_null={}",
(responseInfo == null)
);
}

if (!tracingAlreadyCompleted) {
httpState.setTraceCompletedOrScheduled(true);

Span currentSpan = Tracer.getInstance().getCurrentSpan();
if (currentSpan != null && !currentSpan.isCompleted())
Tracer.getInstance().completeRequestSpan();

httpState.setTraceCompletedOrScheduled(true);
}
},
ctx
).run();
}

// Make sure access logging is handled
if (!httpState.isAccessLogCompletedOrScheduled() && accessLogger != null) {
httpState.setAccessLogCompletedOrScheduled(true);

RequestInfo<?> requestInfoToUse = (requestInfo == null)
? RequestInfoImpl.dummyInstanceForUnknownRequests()
: requestInfo;
accessLogger.log(
requestInfoToUse, httpState.getActualResponseObject(), responseInfo,
httpState.calculateTotalRequestTimeMillis()
);
}

// Make sure metrics is handled
handleMetricsForCompletedRequestIfNotAlreadyDone(httpState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import io.netty.channel.ChannelFuture;
Expand All @@ -30,6 +31,9 @@ public class HttpProcessingState implements ProcessingState {
private Deque<Span> distributedTraceStack;
private Map<String, String> loggerMdcContextMap;
private Instant requestStartTime;
private Long requestStartTimeNanos;
private Long requestLastChunkArrivedTimeNanos;
private Long responseEndTimeNanos;
private ChannelFuture responseWriterFinalChunkChannelFuture;
private boolean traceCompletedOrScheduled = false;
private boolean accessLogCompletedOrScheduled = false;
Expand All @@ -49,6 +53,9 @@ public HttpProcessingState(HttpProcessingState copyMe) {
this.distributedTraceStack = copyMe.getDistributedTraceStack();
this.loggerMdcContextMap = copyMe.getLoggerMdcContextMap();
this.requestStartTime = copyMe.getRequestStartTime();
this.requestStartTimeNanos = copyMe.getRequestStartTimeNanos();
this.requestLastChunkArrivedTimeNanos = copyMe.getRequestLastChunkArrivedTimeNanos();
this.responseEndTimeNanos = copyMe.getResponseEndTimeNanos();
this.responseWriterFinalChunkChannelFuture = copyMe.getResponseWriterFinalChunkChannelFuture();
this.traceCompletedOrScheduled = copyMe.isTraceCompletedOrScheduled();
this.accessLogCompletedOrScheduled = copyMe.isAccessLogCompletedOrScheduled();
Expand All @@ -68,6 +75,9 @@ public void cleanStateForNewRequest() {
distributedTraceStack = null;
loggerMdcContextMap = null;
requestStartTime = null;
requestStartTimeNanos = null;
requestLastChunkArrivedTimeNanos = null;
responseEndTimeNanos = null;
responseWriterFinalChunkChannelFuture = null;
traceCompletedOrScheduled = false;
accessLogCompletedOrScheduled = false;
Expand Down Expand Up @@ -148,6 +158,54 @@ public void setRequestStartTime(Instant requestStartTime) {
this.requestStartTime = requestStartTime;
}

public Long getRequestStartTimeNanos() {
return requestStartTimeNanos;
}

public void setRequestStartTimeNanos(Long requestStartTimeNanos) {
this.requestStartTimeNanos = requestStartTimeNanos;
}

public Long getRequestLastChunkArrivedTimeNanos() {
return requestLastChunkArrivedTimeNanos;
}

public void setRequestLastChunkArrivedTimeNanos(Long requestLastChunkArrivedTimeNanos) {
this.requestLastChunkArrivedTimeNanos = requestLastChunkArrivedTimeNanos;
}

public Long getResponseEndTimeNanos() {
return responseEndTimeNanos;
}

public void setResponseEndTimeNanosToNowIfNotAlreadySet() {
// Only the first piece of code that recognizes that the response is done gets to set response end time.
if (this.responseEndTimeNanos == null) {
this.responseEndTimeNanos = System.nanoTime();
}
}

/**
* @return Convenience method that returns the total time this request took from beginning to end-of-response-sent
* in milliseconds, or null if {@link #getRequestStartTimeNanos()} or {@link #getResponseEndTimeNanos()} is null.
* This method uses {@link TimeUnit#NANOSECONDS} {@link TimeUnit#toMillis(long)} to convert nanoseconds to
* milliseconds, so any nanosecond remainder will be chopped (i.e. 1_999_999 nanoseconds will convert to 1
* millisecond - the 0.999999 milliseconds worth of nanosecond-remainder is dropped).
*
* <p>If you need greater precision than milliseconds you can refer to {@link #getRequestStartTimeNanos()}
* (and/or {@link #getRequestLastChunkArrivedTimeNanos()}) and {@link #getResponseEndTimeNanos()} for nanosecond
* precision, although you'll need to do the math yourself and you'll need to keep in mind that the values are only
* useful in relation to each other - they are not timestamps. For a timestamp you can refer to {@link
* #getRequestStartTime()} which returns an {@link Instant}.
*/
public Long calculateTotalRequestTimeMillis() {
if (requestStartTimeNanos == null || responseEndTimeNanos == null) {
return null;
}

return TimeUnit.NANOSECONDS.toMillis(responseEndTimeNanos - requestStartTimeNanos);
}

public ChannelFuture getResponseWriterFinalChunkChannelFuture() {
return responseWriterFinalChunkChannelFuture;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.nike.riposte.server.http;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nike.riposte.server.channelpipeline.ChannelAttributes;
import com.nike.riposte.server.channelpipeline.message.ChunkedOutboundMessage;
import com.nike.riposte.server.channelpipeline.message.OutboundMessageSendContentChunk;
Expand All @@ -14,6 +12,18 @@
import com.nike.wingtips.TraceAndSpanIdGenerator;
import com.nike.wingtips.TraceHeaders;
import com.nike.wingtips.Tracer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
Expand All @@ -30,13 +40,6 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;

import static com.nike.riposte.util.AsyncNettyHelper.consumerWithTracingAndMdc;
import static com.nike.riposte.util.AsyncNettyHelper.runnableWithTracingAndMdc;
Expand Down Expand Up @@ -513,6 +516,9 @@ else if (chunkToWrite instanceof HttpContent)
if (state != null && isLastChunk) {
// Set the state's responseWriterFinalChunkChannelFuture so that handlers can hook into it if desired.
state.setResponseWriterFinalChunkChannelFuture(writeFuture);

// Always attach a listener that sets response end time.
writeFuture.addListener(future -> state.setResponseEndTimeNanosToNowIfNotAlreadySet());
}

// Always attach a listener that logs write errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class AccessLogEndHandlerSpec extends Specification {
logger.clearAll()

HttpProcessingState state = Mock(HttpProcessingState)
state.getRequestStartTime() >> Instant.EPOCH
state.calculateTotalRequestTimeMillis() >> 42
state.getResponseInfo() >> Mock(ResponseInfo)
ChannelAttributes.getHttpProcessingStateForChannel(_) >> state
ChannelHandlerContext mockContext = Mock(ChannelHandlerContext)
Expand All @@ -72,6 +72,8 @@ class AccessLogEndHandlerSpec extends Specification {
def (ChannelHandlerContext mockContext, HttpProcessingState state) = mockContext()
ChannelFuture channelFutureMock = Mock(ChannelFuture)
RequestInfo requestInfoMock = Mock(RequestInfo)
Long expectedElapsedTimeMillis = state.calculateTotalRequestTimeMillis()
assert expectedElapsedTimeMillis != null
state.getResponseWriterFinalChunkChannelFuture() >> channelFutureMock
state.isResponseSent() >> true
state.getRequestInfo() >> requestInfoMock
Expand All @@ -89,7 +91,7 @@ class AccessLogEndHandlerSpec extends Specification {
HttpResponse actualResponseObjectMock = state.getActualResponseObject()
ResponseInfo responseInfoMock = state.getResponseInfo()
boolean accessLoggerCalled = false
accessLogger.log(_ as RequestInfo, actualResponseObjectMock, responseInfoMock, _ as Long) >> {
accessLogger.log(_ as RequestInfo, actualResponseObjectMock, responseInfoMock, expectedElapsedTimeMillis) >> {
accessLoggerCalled = true
return null
}
Expand All @@ -105,7 +107,7 @@ class AccessLogEndHandlerSpec extends Specification {

protected List mockContext() {
HttpProcessingState state = Mock(HttpProcessingState)
state.getRequestStartTime() >> Instant.EPOCH
state.calculateTotalRequestTimeMillis() >> 42
state.getResponseInfo() >> Mock(ResponseInfo)
state.getActualResponseObject() >> Mock(HttpResponse)
ChannelAttributes.getHttpProcessingStateForChannel(_) >> state
Expand Down
Loading