Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zstd compressor implementation for OTLP exporters #1108

Merged
merged 4 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ components:
aws-xray-propagator:
- wangzlei
- srprash
compressors:
- jack-berg
consistent-sampling:
- oertl
- PeterF778
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ feature or via instrumentation, this project is hopefully for you.
* [AWS Resources](./aws-resources/README.md)
* [AWS X-Ray SDK Support](./aws-xray/README.md)
* [AWS X-Ray Propagator](./aws-xray-propagator/README.md)
* [zstd Compressor](./compressors/zstd/README.md)
* [Consistent Sampling](./consistent-sampling/README.md)
* [Disk Buffering](./disk-buffering/README.md)
* [JMX Metric Gatherer](./jmx-metrics/README.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ val DEFAULT_JAVA_VERSION = JavaVersion.VERSION_17
java {
toolchain {
languageVersion.set(
otelJava.minJavaVersionSupported.map { JavaLanguageVersion.of(Math.max(it.majorVersion.toInt(), DEFAULT_JAVA_VERSION.majorVersion.toInt())) }
otelJava.minJavaVersionSupported.map { JavaLanguageVersion.of(Math.max(it.majorVersion.toInt(), DEFAULT_JAVA_VERSION.majorVersion.toInt())) }
)
}

Expand Down
49 changes: 49 additions & 0 deletions compressors/compressor-zstd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# zstd Compressor

A [zstd](https://en.wikipedia.org/wiki/Zstd) implementation of [Compressor](https://github.com/open-telemetry/opentelemetry-java/blob/d9f9812d4375a4229caff43bd681c50b7a45776a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java) and [CompressorProvider](https://github.com/open-telemetry/opentelemetry-java/blob/d9f9812d4375a4229caff43bd681c50b7a45776a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java) based on [luben/zstd-jni](https://github.com/luben/zstd-jni).

This enables zstd compression with [opentelemetry-java's](https://github.com/open-telemetry/opentelemetry-java) [OTLP exporters](https://opentelemetry.io/docs/instrumentation/java/exporters/#otlp).

## Usage

Add dependency, replacing `{{version}}` with the latest release version.

**Maven:**

```xml
<dependency>
<groupId>io.opentelemetry.contrib</groupId>
<artifactId>opentelemetry-compressor-zstd</artifactId>
<version>{{version}}</version>
</dependency>
```

**Gradle:**

```groovy
dependencies {
implementation "io.opentelemetry.contrib:opentelemetry-compressor-zstd:{{version}}"
}
```

If programmatically configuring the exporter:

```java
// same pattern applies to OtlpHttpMetricExporter, OtlpHttpSpanExporter, and the gRPC variants
OtlpHttpLogRecordExporter.builder()
.setCompression("zstd")
// ...additional configuration omitted for brevity
.build()
```

If using [autoconfigure](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure):

```shell
export OTEL_EXPORTER_OTLP_COMPRESSION=zstd
```

## Component owners

- [Jack Berg](https://github.com/jack-berg), New Relic

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
21 changes: 21 additions & 0 deletions compressors/compressor-zstd/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}

description = "zstd compressor implementation for use with OTLP exporters"
otelJava.moduleName.set("io.opentelemetry.contrib.compressor.zstd")

dependencies {
// TODO(jack-berg): Use version from :depedencyManagement when opentelemetry-instrumentation-bom-alpha depends on opentelemetry-java 1.34.0
var openTelemetryVersion = "1.34.0"
api("io.opentelemetry:opentelemetry-exporter-common:$openTelemetryVersion")

implementation("com.github.luben:zstd-jni:1.5.5-10")

testImplementation("io.opentelemetry:opentelemetry-sdk-testing:$openTelemetryVersion")
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp:$openTelemetryVersion")

testImplementation("io.opentelemetry.proto:opentelemetry-proto")
testImplementation("com.linecorp.armeria:armeria-junit5")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import com.github.luben.zstd.ZstdOutputStream;
import io.opentelemetry.exporter.internal.compression.Compressor;
import java.io.IOException;
import java.io.OutputStream;

public final class ZstdCompressor implements Compressor {

private static final ZstdCompressor INSTANCE = new ZstdCompressor();

private ZstdCompressor() {}

public static ZstdCompressor getInstance() {
return INSTANCE;
}

@Override
public String getEncoding() {
return "zstd";
}

@Override
public OutputStream compress(OutputStream outputStream) throws IOException {
return new ZstdOutputStream(outputStream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.compression.CompressorProvider;

public final class ZstdCompressorProvider implements CompressorProvider {
@Override
public Compressor getInstance() {
return ZstdCompressor.getInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.opentelemetry.contrib.compressor.zstd.ZstdCompressorProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import static org.assertj.core.api.Assertions.assertThat;

import com.github.luben.zstd.ZstdInputStream;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.SeverityNumber;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.logs.TestLogRecordData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZstdCompressorProviderTest {

private static final HttpResponse SUCCESS =
HttpResponse.of(
HttpStatus.OK,
MediaType.parse("application/x-protobuf"),
ExportLogsServiceResponse.getDefaultInstance().toByteArray());

private static final ConcurrentLinkedQueue<HttpRequest> httpRequests =
new ConcurrentLinkedQueue<>();
private static final ConcurrentLinkedQueue<ResourceLogs> exportedTelemetry =
new ConcurrentLinkedQueue<>();

@RegisterExtension
static final ServerExtension server =
new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service(
"/v1/logs",
(ctx, req) -> {
httpRequests.add(ctx.request());
return HttpResponse.of(
req.aggregate()
.thenApply(
aggReq -> {
byte[] payload = aggReq.content().array();
try {
if (req.headers().contains("content-encoding", "zstd")) {
ZstdInputStream is =
new ZstdInputStream(new ByteArrayInputStream(payload));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (int result = is.read(); result != -1; result = is.read()) {
baos.write((byte) result);
}
payload = baos.toByteArray();
}
ExportLogsServiceRequest parsed =
ExportLogsServiceRequest.parseFrom(payload);
exportedTelemetry.addAll(parsed.getResourceLogsList());
return SUCCESS;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
});
sb.http(0);
}
};

@Test
void exporterWithZstd() {
try (OtlpHttpLogRecordExporter exporter =
OtlpHttpLogRecordExporter.builder()
.setEndpoint(server.httpUri() + "/v1/logs")
.setCompression("zstd")
.build()) {
assertThat(
exporter
.export(Collections.singletonList(generateFakeLogRecordData()))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isTrue();

assertThat(httpRequests)
.satisfiesExactly(
req -> assertThat(req.headers().contains("content-encoding", "zstd")).isTrue());
assertThat(exportedTelemetry)
.satisfiesExactly(
resourceLogs ->
assertThat(resourceLogs.getScopeLogsList())
.satisfiesExactly(
scopeLogs -> {
InstrumentationScope scope = scopeLogs.getScope();
assertThat(scope.getName()).isEqualTo("testLib");
assertThat(scope.getVersion()).isEqualTo("1.0");
assertThat(scopeLogs.getSchemaUrl()).isEqualTo("http://url");
assertThat(scopeLogs.getLogRecordsList())
.satisfiesExactly(
logRecord -> {
assertThat(logRecord.getBody().getStringValue())
.isEqualTo("log body");
assertThat(logRecord.getAttributesList())
.isEqualTo(
Collections.singletonList(
KeyValue.newBuilder()
.setKey("key")
.setValue(
AnyValue.newBuilder()
.setStringValue("value")
.build())
.build()));
assertThat(logRecord.getSeverityText()).isEqualTo("INFO");
assertThat(logRecord.getSeverityNumber())
.isEqualTo(SeverityNumber.SEVERITY_NUMBER_INFO);
assertThat(logRecord.getTimeUnixNano()).isGreaterThan(0);
assertThat(logRecord.getObservedTimeUnixNano())
.isGreaterThan(0);
});
}));
}
}

/** Generate a fake {@link LogRecordData}. */
public static LogRecordData generateFakeLogRecordData() {
return TestLogRecordData.builder()
.setResource(Resource.getDefault())
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("testLib")
.setVersion("1.0")
.setSchemaUrl("http://url")
.build())
.setBody("log body")
.setAttributes(Attributes.builder().put("key", "value").build())
.setSeverity(Severity.INFO)
.setSeverityText(Severity.INFO.name())
.setTimestamp(Instant.now())
.setObservedTimestamp(Instant.now().plusNanos(100))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.compressor.zstd;

import static org.assertj.core.api.Assertions.assertThat;

import com.github.luben.zstd.ZstdInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;

class ZstdCompressorTest {

@Test
void roundTrip() throws IOException {
String content = "hello world";

ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream os = ZstdCompressor.getInstance().compress(baos);
os.write(content.getBytes(StandardCharsets.UTF_8));
os.close();

byte[] decompressed = new byte[content.length()];
InputStream is = new ZstdInputStream(new ByteArrayInputStream(baos.toByteArray()));
is.read(decompressed);
is.close();

assertThat(new String(decompressed, StandardCharsets.UTF_8)).isEqualTo(content);
}
}
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ val CORE_DEPENDENCIES = listOf(
"com.google.errorprone:error_prone_annotations:${errorProneVersion}",
"com.google.errorprone:error_prone_core:${errorProneVersion}",
"io.github.netmikey.logunit:logunit-jul:2.0.0",
"io.opentelemetry.proto:opentelemetry-proto:1.0.0-alpha",
"io.prometheus:simpleclient:${prometheusVersion}",
"io.prometheus:simpleclient_common:${prometheusVersion}",
"io.prometheus:simpleclient_httpserver:${prometheusVersion}",
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ include(":all")
include(":aws-resources")
include(":aws-xray")
include(":aws-xray-propagator")
include(":compressors:compressor-zstd")
include(":consistent-sampling")
include(":dependencyManagement")
include(":disk-buffering")
Expand Down
Loading