Skip to content

Commit

Permalink
[disk-buffering] Add debug mode for verbose logging. (#1455)
Browse files Browse the repository at this point in the history
  • Loading branch information
breedx-splk authored Sep 17, 2024
1 parent c7ec823 commit be32409
Show file tree
Hide file tree
Showing 26 changed files with 203 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
Expand All @@ -21,10 +22,11 @@ public static LogRecordFromDiskExporter create(
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
FromDiskExporterImpl<LogRecordData> delegate =
FromDiskExporterImpl.<LogRecordData>builder()
.setFolderName("logs")
.setFolderName(SignalTypes.logs.name())
.setStorageConfiguration(config)
.setDeserializer(SignalDeserializer.ofLogs())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new LogRecordFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
Expand All @@ -32,7 +33,7 @@ public static LogRecordToDiskExporter create(
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
ToDiskExporter<LogRecordData> toDisk =
ToDiskExporter.<LogRecordData>builder()
.setFolderName("logs")
.setFolderName(SignalTypes.logs.name())
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofLogs())
.setExportFunction(delegate::export)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
Expand All @@ -21,10 +22,11 @@ public static MetricFromDiskExporter create(MetricExporter exporter, StorageConf
throws IOException {
FromDiskExporterImpl<MetricData> delegate =
FromDiskExporterImpl.<MetricData>builder()
.setFolderName("metrics")
.setFolderName(SignalTypes.metrics.name())
.setStorageConfiguration(config)
.setDeserializer(SignalDeserializer.ofMetrics())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new MetricFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
Expand Down Expand Up @@ -42,7 +43,7 @@ public static MetricToDiskExporter create(
throws IOException {
ToDiskExporter<MetricData> toDisk =
ToDiskExporter.<MetricData>builder()
.setFolderName("metrics")
.setFolderName(SignalTypes.metrics.name())
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofMetrics())
.setExportFunction(delegate::export)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
Expand All @@ -21,10 +22,11 @@ public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfigur
throws IOException {
FromDiskExporterImpl<SpanData> delegate =
FromDiskExporterImpl.<SpanData>builder()
.setFolderName("spans")
.setFolderName(SignalTypes.spans.name())
.setStorageConfiguration(config)
.setDeserializer(SignalDeserializer.ofSpans())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new SpanFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand All @@ -33,7 +34,7 @@ public static SpanToDiskExporter create(SpanExporter delegate, StorageConfigurat
throws IOException {
ToDiskExporter<SpanData> toDisk =
ToDiskExporter.<SpanData>builder()
.setFolderName("spans")
.setFolderName(SignalTypes.spans.name())
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofSpans())
.setExportFunction(delegate::export)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public abstract class StorageConfiguration {
/** The root storage location for buffered telemetry. */
public abstract File getRootDir();

/** Returns true if the storage has been configured with debug verbosity enabled. */
public abstract boolean isDebugEnabled();

/** The max amount of time a file can receive new data. */
public abstract long getMaxFileAgeForWriteMillis();

Expand Down Expand Up @@ -62,6 +65,7 @@ public static Builder builder() {
.setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30))
.setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33))
.setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18))
.setDebugEnabled(false)
.setTemporaryFileProvider(fileProvider);
}

Expand All @@ -81,6 +85,8 @@ public abstract static class Builder {

public abstract Builder setRootDir(File rootDir);

public abstract Builder setDebugEnabled(boolean debugEnabled);

public abstract StorageConfiguration build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class FromDiskExporterBuilder<T> {
private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();

private boolean debugEnabled = false;

@NotNull
private static <T> SignalDeserializer<T> noopDeserializer() {
return x -> emptyList();
Expand Down Expand Up @@ -63,8 +65,19 @@ public FromDiskExporterBuilder<T> setExportFunction(
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> enableDebug() {
return setDebugEnabled(true);
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
this.debugEnabled = debugEnabled;
return this;
}

public FromDiskExporterImpl<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,35 @@
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Signal-type generic class that can read telemetry previously buffered on disk and send it to
* another delegated exporter.
*/
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
private final DebugLogger logger;
private final Storage storage;
private final SignalDeserializer<EXPORT_DATA> deserializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());

FromDiskExporterImpl(
SignalDeserializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
Storage storage,
boolean debugEnabled) {
this.deserializer = deserializer;
this.exportFunction = exportFunction;
this.storage = storage;
this.logger =
DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled);
}

public static <T> FromDiskExporterBuilder<T> builder() {
Expand All @@ -44,19 +48,26 @@ public static <T> FromDiskExporterBuilder<T> builder() {
*
* @param timeout The amount of time to wait for the wrapped exporter to finish.
* @param unit The unit of the time provided.
* @return true if there was data available and it was successfully exported within the timeout
* @return true if there was data available, and it was successfully exported within the timeout
* provided. false otherwise.
* @throws IOException If an unexpected error happens.
*/
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
logger.log(Level.INFO, "Attempting to export batch from disk.");
logger.log("Attempting to export " + deserializer.signalType() + " batch from disk.");
ReadableResult result =
storage.readAndProcess(
bytes -> {
logger.log(Level.INFO, "About to export stored batch.");
CompletableResultCode join =
exportFunction.apply(deserializer.deserialize(bytes)).join(timeout, unit);
logger.log(
"Read "
+ bytes.length
+ " "
+ deserializer.signalType()
+ " bytes from storage.");
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
logger.log(
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess();
});
return result == ReadableResult.SUCCEEDED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
Expand All @@ -16,36 +17,38 @@

public class ToDiskExporter<EXPORT_DATA> {

private static final Logger logger = Logger.getLogger(ToDiskExporter.class.getName());
private final DebugLogger logger;
private final Storage storage;
private final SignalSerializer<EXPORT_DATA> serializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;

ToDiskExporter(
SignalSerializer<EXPORT_DATA> serializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
Storage storage,
boolean debugEnabled) {
this.serializer = serializer;
this.exportFunction = exportFunction;
this.storage = storage;
this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled);
}

public static <T> ToDiskExporterBuilder<T> builder() {
return new ToDiskExporterBuilder<>();
}

public CompletableResultCode export(Collection<EXPORT_DATA> data) {
logger.log(Level.FINER, "Intercepting exporter batch.");
logger.log("Intercepting exporter batch.", Level.FINER);
try {
if (storage.write(serializer.serialize(data))) {
return CompletableResultCode.ofSuccess();
}
logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away.");
logger.log("Could not store batch in disk. Exporting it right away.");
return exportFunction.apply(data);
} catch (IOException e) {
logger.log(
Level.WARNING,
"An unexpected error happened while attempting to write the data in disk. Exporting it right away.",
Level.WARNING,
e);
return exportFunction.apply(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,21 @@ public final class ToDiskExporterBuilder<T> {

private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();
private boolean debugEnabled = false;

ToDiskExporterBuilder() {}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> enableDebug() {
return setDebugEnabled(true);
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
this.debugEnabled = debugEnabled;
return this;
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setFolderName(String folderName) {
storageBuilder.setFolderName(folderName);
Expand Down Expand Up @@ -61,7 +73,7 @@ public ToDiskExporterBuilder<T> setExportFunction(

public ToDiskExporter<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new ToDiskExporter<>(serializer, exportFunction, storage);
return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled);
}

private static void validateConfiguration(StorageConfiguration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.proto.logs.v1.LogsData;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.IOException;
Expand All @@ -28,4 +29,9 @@ public List<LogRecordData> deserialize(byte[] source) {
throw new IllegalArgumentException(e);
}
}

@Override
public String signalType() {
return SignalTypes.logs.name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.io.IOException;
Expand All @@ -28,4 +29,9 @@ public List<MetricData> deserialize(byte[] source) {
throw new IllegalArgumentException(e);
}
}

@Override
public String signalType() {
return SignalTypes.metrics.name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,11 @@ static SignalDeserializer<LogRecordData> ofLogs() {
return LogRecordDataDeserializer.getInstance();
}

/** Deserializes the given byte array into a list of telemetry items. */
List<SDK_ITEM> deserialize(byte[] source);

/** Returns the name of the stored type of signal -- one of "metrics", "spans", or "logs". */
default String signalType() {
return "unknown";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.proto.trace.v1.TracesData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
Expand All @@ -28,4 +29,9 @@ public List<SpanData> deserialize(byte[] source) {
throw new IllegalArgumentException(e);
}
}

@Override
public String signalType() {
return SignalTypes.spans.name();
}
}
Loading

0 comments on commit be32409

Please sign in to comment.