Skip to content

Commit

Permalink
Low allocation OTLP logs marshaler (#6429)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored May 9, 2024
1 parent 5297306 commit 715211e
Show file tree
Hide file tree
Showing 19 changed files with 1,245 additions and 195 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.otlp;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.incubator.events.EventLogger;
import io.opentelemetry.api.incubator.logs.AnyValue;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Warmup;

@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@Fork(1)
public class LogsRequestMarshalerBenchmark {

private static final Collection<LogRecordData> LOGS;
private static final LowAllocationLogsRequestMarshaler MARSHALER =
new LowAllocationLogsRequestMarshaler();
private static final TestOutputStream OUTPUT = new TestOutputStream();

static {
InMemoryLogRecordExporter logRecordExporter = InMemoryLogRecordExporter.create();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
.setResource(
Resource.create(
Attributes.builder()
.put(AttributeKey.booleanKey("key_bool"), true)
.put(AttributeKey.stringKey("key_string"), "string")
.put(AttributeKey.longKey("key_int"), 100L)
.put(AttributeKey.doubleKey("key_double"), 100.3)
.put(
AttributeKey.stringArrayKey("key_string_array"),
Arrays.asList("string", "string"))
.put(AttributeKey.longArrayKey("key_long_array"), Arrays.asList(12L, 23L))
.put(
AttributeKey.doubleArrayKey("key_double_array"),
Arrays.asList(12.3, 23.1))
.put(
AttributeKey.booleanArrayKey("key_boolean_array"),
Arrays.asList(true, false))
.build()))
.addLogRecordProcessor(SimpleLogRecordProcessor.create(logRecordExporter))
.build();

Logger logger1 = loggerProvider.get("logger");
logger1
.logRecordBuilder()
.setBody("Hello world from this log...")
.setAllAttributes(
Attributes.builder()
.put("key_bool", true)
.put("key_String", "string")
.put("key_int", 100L)
.put("key_double", 100.3)
.build())
.setSeverity(Severity.INFO)
.setSeverityText("INFO")
.emit();

SdkEventLoggerProvider eventLoggerProvider = SdkEventLoggerProvider.create(loggerProvider);
EventLogger eventLogger = eventLoggerProvider.get("event-logger");
eventLogger
.builder("namespace.my-event-name")
// Helper methods to set primitive types
.put("stringKey", "value")
.put("longKey", 1L)
.put("doubleKey", 1.0)
.put("boolKey", true)
// Helper methods to set primitive array types
.put("stringArrKey", "value1", "value2")
.put("longArrKey", 1L, 2L)
.put("doubleArrKey", 1.0, 2.0)
.put("boolArrKey", true, false)
// Set AnyValue types to encode complex data
.put(
"anyValueKey", AnyValue.of(Collections.singletonMap("childKey1", AnyValue.of("value"))))
.emit();

LOGS = logRecordExporter.getFinishedLogRecordItems();
}

@Benchmark
public int marshalStateful() throws IOException {
LogsRequestMarshaler marshaler = LogsRequestMarshaler.create(LOGS);
OUTPUT.reset();
marshaler.writeBinaryTo(OUTPUT);
return OUTPUT.getCount();
}

@Benchmark
public int marshalStatefulJson() throws IOException {
LogsRequestMarshaler marshaler = LogsRequestMarshaler.create(LOGS);
OUTPUT.reset();
marshaler.writeJsonTo(OUTPUT);
return OUTPUT.getCount();
}

@Benchmark
public int marshalStateless() throws IOException {
MARSHALER.initialize(LOGS);
try {
OUTPUT.reset();
MARSHALER.writeBinaryTo(OUTPUT);
return OUTPUT.getCount();
} finally {
MARSHALER.reset();
}
}

@Benchmark
public int marshalStatelessJson() throws IOException {
MARSHALER.initialize(LOGS);
try {
OUTPUT.reset();
MARSHALER.writeJsonTo(OUTPUT);
return OUTPUT.getCount();
} finally {
MARSHALER.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.otlp;

import io.opentelemetry.api.incubator.logs.AnyValue;
import io.opentelemetry.api.incubator.logs.KeyAnyValue;
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

/**
* A Marshaler of key value pairs. See {@link AnyValueMarshaler}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class AnyValueStatelessMarshaler implements StatelessMarshaler<AnyValue<?>> {

public static final AnyValueStatelessMarshaler INSTANCE = new AnyValueStatelessMarshaler();

private AnyValueStatelessMarshaler() {}

@SuppressWarnings("unchecked")
@Override
public void writeTo(Serializer output, AnyValue<?> value, MarshalerContext context)
throws IOException {
switch (value.getType()) {
case STRING:
StringAnyValueStatelessMarshaler.INSTANCE.writeTo(
output, (String) value.getValue(), context);
return;
case BOOLEAN:
BoolAnyValueStatelessMarshaler.INSTANCE.writeTo(
output, (Boolean) value.getValue(), context);
return;
case LONG:
IntAnyValueStatelessMarshaler.INSTANCE.writeTo(output, (Long) value.getValue(), context);
return;
case DOUBLE:
DoubleAnyValueStatelessMarshaler.INSTANCE.writeTo(
output, (Double) value.getValue(), context);
return;
case ARRAY:
output.serializeMessageWithContext(
io.opentelemetry.proto.common.v1.internal.AnyValue.ARRAY_VALUE,
(List<AnyValue<?>>) value.getValue(),
ArrayAnyValueStatelessMarshaler.INSTANCE,
context);
return;
case KEY_VALUE_LIST:
output.serializeMessageWithContext(
io.opentelemetry.proto.common.v1.internal.AnyValue.KVLIST_VALUE,
(List<KeyAnyValue>) value.getValue(),
KeyValueListAnyValueStatelessMarshaler.INSTANCE,
context);
return;
case BYTES:
BytesAnyValueStatelessMarshaler.INSTANCE.writeTo(
output, (ByteBuffer) value.getValue(), context);
return;
}
// Error prone ensures the switch statement is complete, otherwise only can happen with
// unaligned versions which are not supported.
throw new IllegalArgumentException("Unsupported value type.");
}

@SuppressWarnings("unchecked")
@Override
public int getBinarySerializedSize(AnyValue<?> value, MarshalerContext context) {
switch (value.getType()) {
case STRING:
return StringAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
(String) value.getValue(), context);
case BOOLEAN:
return BoolAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
(Boolean) value.getValue(), context);
case LONG:
return IntAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
(Long) value.getValue(), context);
case DOUBLE:
return DoubleAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
(Double) value.getValue(), context);
case ARRAY:
return StatelessMarshalerUtil.sizeMessageWithContext(
io.opentelemetry.proto.common.v1.internal.AnyValue.ARRAY_VALUE,
(List<AnyValue<?>>) value.getValue(),
ArrayAnyValueStatelessMarshaler.INSTANCE,
context);
case KEY_VALUE_LIST:
return StatelessMarshalerUtil.sizeMessageWithContext(
io.opentelemetry.proto.common.v1.internal.AnyValue.KVLIST_VALUE,
(List<KeyAnyValue>) value.getValue(),
KeyValueListAnyValueStatelessMarshaler.INSTANCE,
context);
case BYTES:
return BytesAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
(ByteBuffer) value.getValue(), context);
}
// Error prone ensures the switch statement is complete, otherwise only can happen with
// unaligned versions which are not supported.
throw new IllegalArgumentException("Unsupported value type.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,32 @@

package io.opentelemetry.exporter.internal.otlp;

import io.opentelemetry.api.common.AttributeType;
import io.opentelemetry.api.incubator.logs.AnyValue;
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler2;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
import io.opentelemetry.proto.common.v1.internal.ArrayValue;
import java.io.IOException;
import java.util.List;

/** See {@link ArrayAnyValueMarshaler}. */
// TODO: add support for List<io.opentelemetry.api.incubator.logs.AnyValue<?>>
final class ArrayAnyValueStatelessMarshaler<T>
implements StatelessMarshaler2<AttributeType, List<T>> {
static final ArrayAnyValueStatelessMarshaler<Object> INSTANCE =
new ArrayAnyValueStatelessMarshaler<>();
/** A Marshaler of key value pairs. See {@link ArrayAnyValueMarshaler}. */
final class ArrayAnyValueStatelessMarshaler implements StatelessMarshaler<List<AnyValue<?>>> {

static final ArrayAnyValueStatelessMarshaler INSTANCE = new ArrayAnyValueStatelessMarshaler();

private ArrayAnyValueStatelessMarshaler() {}

@SuppressWarnings("unchecked")
@Override
public void writeTo(Serializer output, AttributeType type, List<T> list, MarshalerContext context)
public void writeTo(Serializer output, List<AnyValue<?>> value, MarshalerContext context)
throws IOException {
switch (type) {
case STRING_ARRAY:
output.serializeRepeatedMessageWithContext(
ArrayValue.VALUES,
(List<String>) list,
StringAnyValueStatelessMarshaler.INSTANCE,
context);
return;
case LONG_ARRAY:
output.serializeRepeatedMessageWithContext(
ArrayValue.VALUES, (List<Long>) list, IntAnyValueStatelessMarshaler.INSTANCE, context);
return;
case BOOLEAN_ARRAY:
output.serializeRepeatedMessageWithContext(
ArrayValue.VALUES,
(List<Boolean>) list,
BoolAnyValueStatelessMarshaler.INSTANCE,
context);
return;
case DOUBLE_ARRAY:
output.serializeRepeatedMessageWithContext(
ArrayValue.VALUES,
(List<Double>) list,
DoubleAnyValueStatelessMarshaler.INSTANCE,
context);
return;
default:
throw new IllegalArgumentException("Unsupported attribute type.");
}
output.serializeRepeatedMessageWithContext(
ArrayValue.VALUES, value, AnyValueStatelessMarshaler.INSTANCE, context);
}

@SuppressWarnings("unchecked")
@Override
public int getBinarySerializedSize(AttributeType type, List<T> list, MarshalerContext context) {
switch (type) {
case STRING_ARRAY:
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ArrayValue.VALUES,
(List<String>) list,
StringAnyValueStatelessMarshaler.INSTANCE,
context);
case LONG_ARRAY:
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ArrayValue.VALUES, (List<Long>) list, IntAnyValueStatelessMarshaler.INSTANCE, context);
case BOOLEAN_ARRAY:
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ArrayValue.VALUES,
(List<Boolean>) list,
BoolAnyValueStatelessMarshaler.INSTANCE,
context);
case DOUBLE_ARRAY:
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ArrayValue.VALUES,
(List<Double>) list,
DoubleAnyValueStatelessMarshaler.INSTANCE,
context);
default:
throw new IllegalArgumentException("Unsupported attribute type.");
}
public int getBinarySerializedSize(List<AnyValue<?>> value, MarshalerContext context) {
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
ArrayValue.VALUES, value, AnyValueStatelessMarshaler.INSTANCE, context);
}
}
Loading

0 comments on commit 715211e

Please sign in to comment.