Skip to content

Commit

Permalink
Make suppressing logger configurable for ReportingService. (#257)
Browse files Browse the repository at this point in the history
Fixes #206

Use WavefrontClient.Builder#reportingServiceLogSuppressTimeSeconds()

-------
Co-authored by: Glenn Oppegard <goppegard@vmware.com>
  • Loading branch information
akodali18 authored Feb 7, 2023
1 parent d9e0cba commit be2ffad
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
6 changes: 5 additions & 1 deletion src/main/java/com/wavefront/sdk/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.wavefront.sdk.common.Pair;
import com.wavefront.sdk.common.WavefrontSender;
import com.wavefront.sdk.common.clients.WavefrontClient;
import com.wavefront.sdk.common.clients.WavefrontClientFactory;
import com.wavefront.sdk.direct.ingestion.WavefrontDirectIngestionClient;
import com.wavefront.sdk.entities.histograms.HistogramGranularity;
Expand Down Expand Up @@ -119,8 +120,11 @@ public static void main(String[] args) throws InterruptedException, IOException
token + "@" + wavefrontServer.substring(wavefrontServer.indexOf("://")+3);
System.out.println("wavefrontServerWithToken = " + wavefrontServerWithToken);

WavefrontClient.Builder wfClientBuilder = new WavefrontClient.Builder(wavefrontServer, token);
WavefrontSender wavefrontSender = wfClientBuilder.build();

WavefrontClientFactory wavefrontClientFactory = new WavefrontClientFactory();
wavefrontClientFactory.addClient(wavefrontServerWithToken);
wavefrontClientFactory.addClient(wavefrontSender);

// DEPRECATED Client: Direct Data Ingestion
// WavefrontDirectIngestionClient wavefrontDirectIngestionClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public static class Builder {
private int tracesPort = -1;
private int maxQueueSize = 500000;
private int batchSize = 10000;
private long reportingServiceLogSuppressTimeSeconds = 300;
private long flushInterval = 1;
private TimeUnit flushIntervalTimeUnit = TimeUnit.SECONDS;
private int messageSizeBytes = Integer.MAX_VALUE;
Expand Down Expand Up @@ -198,6 +199,17 @@ public Builder batchSize(int batchSize) {
return this;
}

/**
* Set the reportingService log suppression time in seconds. The logs will be suppressed until this time elapses.
*
* @param reportingServiceLogSuppressTimeSeconds
* @return {@code this}
*/
public Builder reportingServiceLogSuppressTimeSeconds(long reportingServiceLogSuppressTimeSeconds) {
this.reportingServiceLogSuppressTimeSeconds = reportingServiceLogSuppressTimeSeconds;
return this;
}

/**
* Set interval at which you want to flush points to Wavefront cluster.
*
Expand Down Expand Up @@ -354,8 +366,8 @@ private WavefrontClient(Builder builder) {
spanLogsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
eventsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
logsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
metricsReportingService = new ReportingService(builder.metricsUri, builder.token);
tracesReportingService = new ReportingService(builder.tracesUri, builder.token);
metricsReportingService = new ReportingService(builder.metricsUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds);
tracesReportingService = new ReportingService(builder.tracesUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds);
scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("wavefrontClientSender").setDaemon(true));
scheduler.scheduleAtFixedRate(this, 1, builder.flushInterval, builder.flushIntervalTimeUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
*/
public class ReportingService implements ReportAPI {

private static final MessageSuppressingLogger MESSAGE_SUPPRESSING_LOGGER =
new MessageSuppressingLogger(Logger.getLogger(ReportingService.class.getCanonicalName()),
5, TimeUnit.MINUTES);

// This logger is intended to be configurable in the WavefrontClient.Builder. Given that the invoker controls the
// configuration, this is not a static logger.
private final MessageSuppressingLogger messageSuppressingLogger;
private final String token;
private final URI uri;

Expand All @@ -36,9 +35,14 @@ public class ReportingService implements ReportAPI {
private static final int BUFFER_SIZE = 4096;
private static final int NO_HTTP_RESPONSE = -1;

public ReportingService(URI uri, @Nullable String token) {
public ReportingService(URI uri, @Nullable String token, long reportingServiceLogSuppressTimeSeconds) {
this.uri = uri;
this.token = token;
// Setting suppress time to 0 invalidates the cache used by the message suppressing logger and doesn't log anything.
// So defaulting to the minimum of 1 second.
reportingServiceLogSuppressTimeSeconds = reportingServiceLogSuppressTimeSeconds <= 0 ? 1 : reportingServiceLogSuppressTimeSeconds;
this.messageSuppressingLogger = new MessageSuppressingLogger(Logger.getLogger(
ReportingService.class.getCanonicalName()), reportingServiceLogSuppressTimeSeconds, TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -67,7 +71,7 @@ public int send(String format, InputStream stream) {
}
statusCode = urlConn.getResponseCode();
readAndClose(urlConn.getInputStream());
MESSAGE_SUPPRESSING_LOGGER.reset(urlConn.getURL().toString());
messageSuppressingLogger.reset(urlConn.getURL().toString());
} catch (IOException ex) {
if (urlConn != null) {
return safeGetResponseCodeAndClose(urlConn);
Expand Down Expand Up @@ -117,7 +121,7 @@ public int sendEvent(InputStream stream) {

statusCode = urlConn.getResponseCode();
readAndClose(urlConn.getInputStream());
MESSAGE_SUPPRESSING_LOGGER.reset(urlConn.getURL().toString());
messageSuppressingLogger.reset(urlConn.getURL().toString());
} catch (IOException ex) {
if (urlConn != null) {
return safeGetResponseCodeAndClose(urlConn);
Expand All @@ -131,7 +135,7 @@ private int safeGetResponseCodeAndClose(HttpURLConnection urlConn) {
try {
statusCode = urlConn.getResponseCode();
} catch (IOException ex) {
MESSAGE_SUPPRESSING_LOGGER.log(urlConn.getURL().toString(), Level.SEVERE,
messageSuppressingLogger.log(urlConn.getURL().toString(), Level.SEVERE,
"Unable to obtain status code from the Wavefront service at "
+ urlConn.getURL().toString() + " due to: " + ex);
statusCode = NO_HTTP_RESPONSE;
Expand All @@ -140,7 +144,7 @@ private int safeGetResponseCodeAndClose(HttpURLConnection urlConn) {
try {
readAndClose(urlConn.getErrorStream());
} catch (IOException ex) {
MESSAGE_SUPPRESSING_LOGGER.log(urlConn.getURL().toString(), Level.SEVERE,
messageSuppressingLogger.log(urlConn.getURL().toString(), Level.SEVERE,
"Unable to read and close error stream from the Wavefront service at "
+ urlConn.getURL().toString() + " due to: " + ex);
}
Expand Down

0 comments on commit be2ffad

Please sign in to comment.