From be2ffad967ec460ac4de3b47c81552889a36d487 Mon Sep 17 00:00:00 2001 From: Anil Krishna Kodali <38334031+akodali18@users.noreply.github.com> Date: Tue, 7 Feb 2023 16:04:32 -0700 Subject: [PATCH] Make suppressing logger configurable for ReportingService. (#257) Fixes #206 Use WavefrontClient.Builder#reportingServiceLogSuppressTimeSeconds() ------- Co-authored by: Glenn Oppegard --- src/main/java/com/wavefront/sdk/Main.java | 6 ++++- .../sdk/common/clients/WavefrontClient.java | 16 ++++++++++++-- .../clients/service/ReportingService.java | 22 +++++++++++-------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/wavefront/sdk/Main.java b/src/main/java/com/wavefront/sdk/Main.java index e5d3f36c..0c720996 100644 --- a/src/main/java/com/wavefront/sdk/Main.java +++ b/src/main/java/com/wavefront/sdk/Main.java @@ -2,6 +2,7 @@ import com.wavefront.sdk.common.Pair; import com.wavefront.sdk.common.WavefrontSender; +import com.wavefront.sdk.common.clients.WavefrontClient; import com.wavefront.sdk.common.clients.WavefrontClientFactory; import com.wavefront.sdk.direct.ingestion.WavefrontDirectIngestionClient; import com.wavefront.sdk.entities.histograms.HistogramGranularity; @@ -119,8 +120,11 @@ public static void main(String[] args) throws InterruptedException, IOException token + "@" + wavefrontServer.substring(wavefrontServer.indexOf("://")+3); System.out.println("wavefrontServerWithToken = " + wavefrontServerWithToken); + WavefrontClient.Builder wfClientBuilder = new WavefrontClient.Builder(wavefrontServer, token); + WavefrontSender wavefrontSender = wfClientBuilder.build(); + WavefrontClientFactory wavefrontClientFactory = new WavefrontClientFactory(); - wavefrontClientFactory.addClient(wavefrontServerWithToken); + wavefrontClientFactory.addClient(wavefrontSender); // DEPRECATED Client: Direct Data Ingestion // WavefrontDirectIngestionClient wavefrontDirectIngestionClient = diff --git a/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java b/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java index 579a6501..a2518de8 100644 --- a/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java +++ b/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java @@ -146,6 +146,7 @@ public static class Builder { private int tracesPort = -1; private int maxQueueSize = 500000; private int batchSize = 10000; + private long reportingServiceLogSuppressTimeSeconds = 300; private long flushInterval = 1; private TimeUnit flushIntervalTimeUnit = TimeUnit.SECONDS; private int messageSizeBytes = Integer.MAX_VALUE; @@ -198,6 +199,17 @@ public Builder batchSize(int batchSize) { return this; } + /** + * Set the reportingService log suppression time in seconds. The logs will be suppressed until this time elapses. + * + * @param reportingServiceLogSuppressTimeSeconds + * @return {@code this} + */ + public Builder reportingServiceLogSuppressTimeSeconds(long reportingServiceLogSuppressTimeSeconds) { + this.reportingServiceLogSuppressTimeSeconds = reportingServiceLogSuppressTimeSeconds; + return this; + } + /** * Set interval at which you want to flush points to Wavefront cluster. * @@ -354,8 +366,8 @@ private WavefrontClient(Builder builder) { spanLogsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); eventsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); logsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); - metricsReportingService = new ReportingService(builder.metricsUri, builder.token); - tracesReportingService = new ReportingService(builder.tracesUri, builder.token); + metricsReportingService = new ReportingService(builder.metricsUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds); + tracesReportingService = new ReportingService(builder.tracesUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds); scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("wavefrontClientSender").setDaemon(true)); scheduler.scheduleAtFixedRate(this, 1, builder.flushInterval, builder.flushIntervalTimeUnit); diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java b/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java index 0217aeec..4f7d1f53 100644 --- a/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java +++ b/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java @@ -24,10 +24,9 @@ */ public class ReportingService implements ReportAPI { - private static final MessageSuppressingLogger MESSAGE_SUPPRESSING_LOGGER = - new MessageSuppressingLogger(Logger.getLogger(ReportingService.class.getCanonicalName()), - 5, TimeUnit.MINUTES); - + // This logger is intended to be configurable in the WavefrontClient.Builder. Given that the invoker controls the + // configuration, this is not a static logger. + private final MessageSuppressingLogger messageSuppressingLogger; private final String token; private final URI uri; @@ -36,9 +35,14 @@ public class ReportingService implements ReportAPI { private static final int BUFFER_SIZE = 4096; private static final int NO_HTTP_RESPONSE = -1; - public ReportingService(URI uri, @Nullable String token) { + public ReportingService(URI uri, @Nullable String token, long reportingServiceLogSuppressTimeSeconds) { this.uri = uri; this.token = token; + // Setting suppress time to 0 invalidates the cache used by the message suppressing logger and doesn't log anything. + // So defaulting to the minimum of 1 second. + reportingServiceLogSuppressTimeSeconds = reportingServiceLogSuppressTimeSeconds <= 0 ? 1 : reportingServiceLogSuppressTimeSeconds; + this.messageSuppressingLogger = new MessageSuppressingLogger(Logger.getLogger( + ReportingService.class.getCanonicalName()), reportingServiceLogSuppressTimeSeconds, TimeUnit.SECONDS); } @Override @@ -67,7 +71,7 @@ public int send(String format, InputStream stream) { } statusCode = urlConn.getResponseCode(); readAndClose(urlConn.getInputStream()); - MESSAGE_SUPPRESSING_LOGGER.reset(urlConn.getURL().toString()); + messageSuppressingLogger.reset(urlConn.getURL().toString()); } catch (IOException ex) { if (urlConn != null) { return safeGetResponseCodeAndClose(urlConn); @@ -117,7 +121,7 @@ public int sendEvent(InputStream stream) { statusCode = urlConn.getResponseCode(); readAndClose(urlConn.getInputStream()); - MESSAGE_SUPPRESSING_LOGGER.reset(urlConn.getURL().toString()); + messageSuppressingLogger.reset(urlConn.getURL().toString()); } catch (IOException ex) { if (urlConn != null) { return safeGetResponseCodeAndClose(urlConn); @@ -131,7 +135,7 @@ private int safeGetResponseCodeAndClose(HttpURLConnection urlConn) { try { statusCode = urlConn.getResponseCode(); } catch (IOException ex) { - MESSAGE_SUPPRESSING_LOGGER.log(urlConn.getURL().toString(), Level.SEVERE, + messageSuppressingLogger.log(urlConn.getURL().toString(), Level.SEVERE, "Unable to obtain status code from the Wavefront service at " + urlConn.getURL().toString() + " due to: " + ex); statusCode = NO_HTTP_RESPONSE; @@ -140,7 +144,7 @@ private int safeGetResponseCodeAndClose(HttpURLConnection urlConn) { try { readAndClose(urlConn.getErrorStream()); } catch (IOException ex) { - MESSAGE_SUPPRESSING_LOGGER.log(urlConn.getURL().toString(), Level.SEVERE, + messageSuppressingLogger.log(urlConn.getURL().toString(), Level.SEVERE, "Unable to read and close error stream from the Wavefront service at " + urlConn.getURL().toString() + " due to: " + ex); }