Skip to content

Commit

Permalink
Merge pull request #39020 from geoand/robust-http-exporter
Browse files Browse the repository at this point in the history
Make VertxHttpExporter more robust
  • Loading branch information
brunobat authored Feb 27, 2024
2 parents 8a263e5 + 5c1c67f commit 819ebb3
Showing 1 changed file with 159 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
Expand All @@ -22,6 +24,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
import io.smallrye.mutiny.Uni;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -38,6 +41,8 @@ final class VertxHttpExporter implements SpanExporter {
private static final Logger internalLogger = Logger.getLogger(VertxHttpExporter.class.getName());
private static final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);

private static final int MAX_ATTEMPTS = 3;

private final HttpExporter<TraceRequestMarshaler> delegate;

VertxHttpExporter(HttpExporter<TraceRequestMarshaler> delegate) {
Expand Down Expand Up @@ -110,75 +115,35 @@ private static String determineBasePath(URI baseUri) {
@Override
public void send(Consumer<OutputStream> marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Response> onHttpResponseRead,
Consumer<Throwable> onError) {

client.request(HttpMethod.POST, basePath + TRACES_PATH)
.onSuccess(new Handler<>() {
@Override
public void handle(HttpClientRequest request) {

HttpClientRequest clientRequest = request.response(new Handler<>() {
@Override
public void handle(AsyncResult<HttpClientResponse> callResult) {
if (callResult.succeeded()) {
HttpClientResponse clientResponse = callResult.result();
clientResponse.body(new Handler<>() {
@Override
public void handle(AsyncResult<Buffer> bodyResult) {
if (bodyResult.succeeded()) {
onResponse.accept(new Response() {
@Override
public int statusCode() {
return clientResponse.statusCode();
}

@Override
public String statusMessage() {
return clientResponse.statusMessage();
}

@Override
public byte[] responseBody() {
return bodyResult.result().getBytes();
}
});
} else {
onError.accept(bodyResult.cause());
}
}
});
} else {
onError.accept(callResult.cause());
}
}
})
.putHeader("Content-Type", contentType);

Buffer buffer = Buffer.buffer(contentLength);
OutputStream os = new BufferOutputStream(buffer);
if (compressionEnabled) {
clientRequest.putHeader("Content-Encoding", "gzip");
try (var gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
}

if (!headers.isEmpty()) {
for (var entry : headers.entrySet()) {
clientRequest.putHeader(entry.getKey(), entry.getValue());
}
}

clientRequest.send(buffer);
String requestURI = basePath + TRACES_PATH;
var clientRequestSuccessHandler = new ClientRequestSuccessHandler(client, requestURI, headers, compressionEnabled,
contentType,
contentLength, onHttpResponseRead,
onError, marshaler, 1);
initiateSend(client, requestURI, MAX_ATTEMPTS, clientRequestSuccessHandler, onError);
}

private static void initiateSend(HttpClient client, String requestURI,
int numberOfAttempts,
Handler<HttpClientRequest> clientRequestSuccessHandler,
Consumer<Throwable> onError) {
Uni.createFrom().completionStage(new Supplier<CompletionStage<HttpClientRequest>>() {
@Override
public CompletionStage<HttpClientRequest> get() {
return client.request(HttpMethod.POST, requestURI).toCompletionStage();
}
}).onFailure().retry()
.withBackOff(Duration.ofMillis(100))
.atMost(numberOfAttempts)
.subscribe().with(new Consumer<>() {
@Override
public void accept(HttpClientRequest request) {
clientRequestSuccessHandler.handle(request);
}
})
.onFailure(onError::accept);
}, onError);
}

@Override
Expand All @@ -204,5 +169,134 @@ public void handle(Throwable event) {
});
return shutdownResult;
}

private static class ClientRequestSuccessHandler implements Handler<HttpClientRequest> {
private final HttpClient client;
private final String requestURI;
private final Map<String, String> headers;
private final boolean compressionEnabled;
private final String contentType;
private final int contentLength;
private final Consumer<Response> onHttpResponseRead;
private final Consumer<Throwable> onError;
private final Consumer<OutputStream> marshaler;

private final int attemptNumber;

public ClientRequestSuccessHandler(HttpClient client,
String requestURI, Map<String, String> headers,
boolean compressionEnabled,
String contentType,
int contentLength,
Consumer<Response> onHttpResponseRead,
Consumer<Throwable> onError,
Consumer<OutputStream> marshaler,
int attemptNumber) {
this.client = client;
this.requestURI = requestURI;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.contentType = contentType;
this.contentLength = contentLength;
this.onHttpResponseRead = onHttpResponseRead;
this.onError = onError;
this.marshaler = marshaler;
this.attemptNumber = attemptNumber;
}

@Override
public void handle(HttpClientRequest request) {

HttpClientRequest clientRequest = request.response(new Handler<>() {
@Override
public void handle(AsyncResult<HttpClientResponse> callResult) {
if (callResult.succeeded()) {
HttpClientResponse clientResponse = callResult.result();
clientResponse.body(new Handler<>() {
@Override
public void handle(AsyncResult<Buffer> bodyResult) {
if (bodyResult.succeeded()) {
if (clientResponse.statusCode() >= 500) {
if (attemptNumber <= MAX_ATTEMPTS) {
// we should retry for 5xx error as they might be recoverable
initiateSend(client, requestURI,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
onError);
return;
}
}
onHttpResponseRead.accept(new Response() {
@Override
public int statusCode() {
return clientResponse.statusCode();
}

@Override
public String statusMessage() {
return clientResponse.statusMessage();
}

@Override
public byte[] responseBody() {
return bodyResult.result().getBytes();
}
});
} else {
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, requestURI,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
onError);
} else {
onError.accept(bodyResult.cause());
}
}
}
});
} else {
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, requestURI,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
onError);
} else {
onError.accept(callResult.cause());
}
}
}
})
.putHeader("Content-Type", contentType);

Buffer buffer = Buffer.buffer(contentLength);
OutputStream os = new BufferOutputStream(buffer);
if (compressionEnabled) {
clientRequest.putHeader("Content-Encoding", "gzip");
try (var gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
}

if (!headers.isEmpty()) {
for (var entry : headers.entrySet()) {
clientRequest.putHeader(entry.getKey(), entry.getValue());
}
}

clientRequest.send(buffer);
}

public ClientRequestSuccessHandler newAttempt() {
return new ClientRequestSuccessHandler(client, requestURI, headers, compressionEnabled,
contentType, contentLength, onHttpResponseRead,
onError, marshaler, attemptNumber + 1);
}
}
}
}

0 comments on commit 819ebb3

Please sign in to comment.