Skip to content

Commit

Permalink
Add zstd compressor implementation for OTLP exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed Nov 21, 2023
1 parent 870d526 commit 5ad5869
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 2 deletions.
10 changes: 8 additions & 2 deletions buildSrc/src/main/kotlin/otel.java-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ plugins {
id("otel.spotless-conventions")
}

val otelJava = extensions.create<OtelJavaExtension>("otelJava")

group = "io.opentelemetry.contrib"

base.archivesName.set("opentelemetry-${project.name}")
base {
// May be set already by a parent project, only set if not.
if (!archivesName.get().startsWith("opentelemetry-")) {
archivesName.set("opentelemetry-$name")
}
}

val otelJava = extensions.create<OtelJavaExtension>("otelJava")

// Version to use to compile code and run tests.
val DEFAULT_JAVA_VERSION = JavaVersion.VERSION_17
Expand Down
10 changes: 10 additions & 0 deletions compressors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
subprojects {
// https://github.com/gradle/gradle/issues/847
group = "io.opentelemetry.compressors"
val proj = this
plugins.withId("java") {
configure<BasePluginExtension> {
archivesName.set("opentelemetry-compressor-${proj.name}")
}
}
}
20 changes: 20 additions & 0 deletions compressors/zstd/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}

description = "zstd compressor implementation for use with OTLP exporters"
otelJava.moduleName.set("io.opentelemetry.contrib.compressor.zstd")

dependencies {
// TODO: remove -SNAPSHOT versions before merging
api("io.opentelemetry:opentelemetry-exporter-common:1.33.0-SNAPSHOT")

implementation("com.github.luben:zstd-jni:1.5.5-10")

testImplementation("io.opentelemetry:opentelemetry-sdk-testing:1.33.0-SNAPSHOT")
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp:1.33.0-SNAPSHOT")

testImplementation("io.opentelemetry.proto:opentelemetry-proto:1.0.0-alpha")
testImplementation("com.linecorp.armeria:armeria-junit5")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import com.github.luben.zstd.ZstdOutputStream;
import io.opentelemetry.exporter.internal.compression.Compressor;
import java.io.IOException;
import java.io.OutputStream;

public final class ZstdCompressor implements Compressor {

private static final ZstdCompressor INSTANCE = new ZstdCompressor();

private ZstdCompressor() {}

public static ZstdCompressor getInstance() {
return INSTANCE;
}

@Override
public String getEncoding() {
return "zstd";
}

@Override
public OutputStream compress(OutputStream outputStream) throws IOException {
return new ZstdOutputStream(outputStream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.compression.CompressorProvider;

public final class ZstdCompressorProvider implements CompressorProvider {
@Override
public Compressor getInstance() {
return ZstdCompressor.getInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.opentelemetry.contrib.compressor.zstd.ZstdCompressorProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import static org.assertj.core.api.Assertions.assertThat;

import com.github.luben.zstd.ZstdInputStream;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.SeverityNumber;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.logs.TestLogRecordData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZstdCompressorProviderTest {

private static final HttpResponse SUCCESS =
HttpResponse.of(
HttpStatus.OK,
MediaType.parse("application/x-protobuf"),
ExportLogsServiceResponse.getDefaultInstance().toByteArray());

private static final ConcurrentLinkedQueue<HttpRequest> httpRequests =
new ConcurrentLinkedQueue<>();
private static final ConcurrentLinkedQueue<ResourceLogs> exportedTelemetry =
new ConcurrentLinkedQueue<>();

@RegisterExtension
static final ServerExtension server =
new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service(
"/v1/logs",
(ctx, req) -> {
httpRequests.add(ctx.request());
return HttpResponse.of(
req.aggregate()
.thenApply(
aggReq -> {
byte[] payload = aggReq.content().array();
try {
if (req.headers().contains("content-encoding", "zstd")) {
ZstdInputStream is =
new ZstdInputStream(new ByteArrayInputStream(payload));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (int result = is.read(); result != -1; result = is.read()) {
baos.write((byte) result);
}
payload = baos.toByteArray();
}
ExportLogsServiceRequest parsed =
ExportLogsServiceRequest.parseFrom(payload);
exportedTelemetry.addAll(parsed.getResourceLogsList());
return SUCCESS;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
});
sb.http(0);
}
};

@Test
void exporterWithZstd() {
try (OtlpHttpLogRecordExporter exporter =
OtlpHttpLogRecordExporter.builder()
.setEndpoint(server.httpUri() + "/v1/logs")
.setCompression("zstd")
.build()) {
assertThat(
exporter
.export(Collections.singletonList(generateFakeLogRecordData()))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isTrue();

assertThat(httpRequests)
.satisfiesExactly(
req -> assertThat(req.headers().contains("content-encoding", "zstd")).isTrue());
assertThat(exportedTelemetry)
.satisfiesExactly(
resourceLogs ->
assertThat(resourceLogs.getScopeLogsList())
.satisfiesExactly(
scopeLogs -> {
InstrumentationScope scope = scopeLogs.getScope();
assertThat(scope.getName()).isEqualTo("testLib");
assertThat(scope.getVersion()).isEqualTo("1.0");
assertThat(scopeLogs.getSchemaUrl()).isEqualTo("http://url");
assertThat(scopeLogs.getLogRecordsList())
.satisfiesExactly(
logRecord -> {
assertThat(logRecord.getBody().getStringValue())
.isEqualTo("log body");
assertThat(logRecord.getAttributesList())
.isEqualTo(
Collections.singletonList(
KeyValue.newBuilder()
.setKey("key")
.setValue(
AnyValue.newBuilder()
.setStringValue("value")
.build())
.build()));
assertThat(logRecord.getSeverityText()).isEqualTo("INFO");
assertThat(logRecord.getSeverityNumber())
.isEqualTo(SeverityNumber.SEVERITY_NUMBER_INFO);
assertThat(logRecord.getTimeUnixNano()).isGreaterThan(0);
assertThat(logRecord.getObservedTimeUnixNano())
.isGreaterThan(0);
});
}));
}
}

/** Generate a fake {@link LogRecordData}. */
public static LogRecordData generateFakeLogRecordData() {
return TestLogRecordData.builder()
.setResource(Resource.getDefault())
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("testLib")
.setVersion("1.0")
.setSchemaUrl("http://url")
.build())
.setBody("log body")
.setAttributes(Attributes.builder().put("key", "value").build())
.setSeverity(Severity.INFO)
.setSeverityText(Severity.INFO.name())
.setTimestamp(Instant.now())
.setObservedTimestamp(Instant.now().plusNanos(100))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import static org.assertj.core.api.Assertions.assertThat;

import com.github.luben.zstd.ZstdInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;

class ZstdCompressorTest {

@Test
void roundTrip() throws IOException {
String content = "hello world";

ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream os = ZstdCompressor.getInstance().compress(baos);
os.write(content.getBytes(StandardCharsets.UTF_8));
os.close();

byte[] decompressed = new byte[content.length()];
InputStream is = new ZstdInputStream(new ByteArrayInputStream(baos.toByteArray()));
is.read(decompressed);
is.close();

assertThat(new String(decompressed, StandardCharsets.UTF_8)).isEqualTo(content);
}
}
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ plugins {
dependencyResolutionManagement {
repositories {
mavenCentral()
mavenLocal()
}
}

Expand Down Expand Up @@ -62,6 +63,7 @@ include(":all")
include(":aws-resources")
include(":aws-xray")
include(":aws-xray-propagator")
include(":compressors:zstd")
include(":consistent-sampling")
include(":dependencyManagement")
include(":disk-buffering")
Expand Down

0 comments on commit 5ad5869

Please sign in to comment.