Skip to content

Commit

Permalink
Break out GrpcSender, GrpcSenderProvider (#5617)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Aug 9, 2023
1 parent c5cdc80 commit a3bf8af
Show file tree
Hide file tree
Showing 71 changed files with 1,026 additions and 753 deletions.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,19 @@ dependency as follows, replacing `{{artifact-id}}` with the value from the "Arti

### SDK Exporters

| Component | Description | Artifact ID | Version | Javadoc |
|-----------------------------------------------------|--------------------------------------------------------------------|----------------------------------------|-------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [OTLP Exporters](./exporters/otlp/all) | OTLP gRPC & HTTP exporters, including traces, metrics, and logs | `opentelemetry-exporter-otlp` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp) |
| [OTLP Common](./exporters/otlp/common) | Shared OTLP components (internal) | `opentelemetry-exporter-otlp-common` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp-common) |
| [Jaeger gRPC Exporter](./exporters/jaeger) | Jaeger gRPC trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger) |
| [Jaeger Thrift Exporter](./exporters/jaeger-thrift) | Jaeger thrift trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger-thift` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger-thrift.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger-thrift) |
| [Logging Exporter](./exporters/logging) | Logging exporters, including metrics, traces, and logs | `opentelemetry-exporter-logging` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-logging.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-logging) |
| [Zipkin Exporter](./exporters/zipkin) | Zipkin trace exporter | `opentelemetry-exporter-zipkin` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-zipkin.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-zipkin) |
| [Prometheus Exporter](./exporters/prometheus) | Prometheus metric exporter | `opentelemetry-exporter-prometheus` | <!--VERSION_UNSTABLE-->1.28.0-alpha<!--/VERSION_UNSTABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-prometheus.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-prometheus) |
| [Exporter Common](./exporters/common) | Shared exporter components (internal) | `opentelemetry-exporter-common` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-common) |
| [OkHttp Sender](./exporters/sender/okhttp) | OkHttp implementation of HttpSender (internal) | `opentelemetry-exporter-sender-okhttp` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-okhttp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-okhttp) |
| [JDK Sender](./exporters/sender/okhttp) | Java 11+ native HttpClient implementation of HttpSender (internal) | `opentelemetry-exporter-sender-jdk` | <!--VERSION_UNSTABLE-->1.28.0-alpha<!--/VERSION_UNSTABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-jdk.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-jdk) | |
| Component | Description | Artifact ID | Version | Javadoc |
|-----------------------------------------------------------------------|--------------------------------------------------------------------|------------------------------------------------------|-------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [OTLP Exporters](./exporters/otlp/all) | OTLP gRPC & HTTP exporters, including traces, metrics, and logs | `opentelemetry-exporter-otlp` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp) |
| [OTLP Common](./exporters/otlp/common) | Shared OTLP components (internal) | `opentelemetry-exporter-otlp-common` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp-common) |
| [Jaeger gRPC Exporter](./exporters/jaeger) | Jaeger gRPC trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger) |
| [Jaeger Thrift Exporter](./exporters/jaeger-thrift) | Jaeger thrift trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger-thift` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger-thrift.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger-thrift) |
| [Logging Exporter](./exporters/logging) | Logging exporters, including metrics, traces, and logs | `opentelemetry-exporter-logging` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-logging.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-logging) |
| [Zipkin Exporter](./exporters/zipkin) | Zipkin trace exporter | `opentelemetry-exporter-zipkin` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-zipkin.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-zipkin) |
| [Prometheus Exporter](./exporters/prometheus) | Prometheus metric exporter | `opentelemetry-exporter-prometheus` | <!--VERSION_UNSTABLE-->1.28.0-alpha<!--/VERSION_UNSTABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-prometheus.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-prometheus) |
| [Exporter Common](./exporters/common) | Shared exporter components (internal) | `opentelemetry-exporter-common` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-common) |
| [OkHttp Sender](./exporters/sender/okhttp) | OkHttp implementation of HttpSender (internal) | `opentelemetry-exporter-sender-okhttp` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-okhttp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-okhttp) |
| [JDK Sender](./exporters/sender/okhttp) | Java 11+ native HttpClient implementation of HttpSender (internal) | `opentelemetry-exporter-sender-jdk` | <!--VERSION_UNSTABLE-->1.28.0-alpha<!--/VERSION_UNSTABLE--> | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-jdk.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-jdk) | |
| [gRPC ManagedChannel Sender](./exporters/sender/grpc-managed-channel) | gRPC ManagedChannel implementation of GrpcSender (internal) | `opentelemetry-exporter-sender-grpc-managed-channel` | <!--VERSION_STABLE-->1.28.0<!--/VERSION_STABLE--> | TODO: add link after 1.29.0 | |

**[1]**: Jaeger now
has [native support for OTLP](https://opentelemetry.io/blog/2022/jaeger-native-otlp/) and jaeger
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Comparing source compatibility of against
No changes.
13 changes: 11 additions & 2 deletions exporters/common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ dependencies {
// We include helpers shared by gRPC exporters but do not want to impose these
// dependency on all of our consumers.
compileOnly("com.fasterxml.jackson.core:jackson-core")
compileOnly("com.squareup.okhttp3:okhttp")
compileOnly("io.grpc:grpc-stub")

testImplementation(project(":sdk:common"))

testImplementation("com.google.protobuf:protobuf-java-util")
testImplementation("com.squareup.okhttp3:okhttp")
testImplementation("com.linecorp.armeria:armeria-junit5")
testImplementation("org.skyscreamer:jsonassert")
testImplementation("com.google.api.grpc:proto-google-common-protos")
Expand All @@ -53,6 +51,17 @@ testing {
}
}
}
suites {
register<JvmTestSuite>("testGrpcSenderProvider") {
dependencies {
implementation(project(":exporters:sender:okhttp"))
implementation(project(":exporters:sender:grpc-managed-channel"))

implementation("io.grpc:grpc-stub")
implementation("io.grpc:grpc-netty")
}
}
}
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal;

import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class RetryUtil {

private static final Set<String> RETRYABLE_GRPC_STATUS_CODES;
private static final Set<Integer> RETRYABLE_HTTP_STATUS_CODES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(429, 502, 503, 504)));

static {
Set<Integer> retryableGrpcStatusCodes = new HashSet<>();
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_CANCELLED);
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_DEADLINE_EXCEEDED);
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_RESOURCE_EXHAUSTED);
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_ABORTED);
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_OUT_OF_RANGE);
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE);
retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_DATA_LOSS);
RETRYABLE_GRPC_STATUS_CODES =
Collections.unmodifiableSet(
retryableGrpcStatusCodes.stream().map(Object::toString).collect(Collectors.toSet()));
}

private RetryUtil() {}

/** Returns the retryable gRPC status codes. */
public static Set<String> retryableGrpcStatusCodes() {
return RETRYABLE_GRPC_STATUS_CODES;
}

/** Returns the retryable HTTP status codes. */
public static Set<Integer> retryableHttpResponseCodes() {
return RETRYABLE_HTTP_STATUS_CODES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,110 @@

package io.opentelemetry.exporter.internal.grpc;

import io.grpc.Channel;
import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE;
import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNIMPLEMENTED;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterMetrics;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
import java.util.function.BiFunction;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* An exporter of a {@link Marshaler} using the gRPC wire format.
* Generic gRPC exporter.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface GrpcExporter<T extends Marshaler> {
@SuppressWarnings("checkstyle:JavadocMethod")
public final class GrpcExporter<T extends Marshaler> {

private static final Logger internalLogger = Logger.getLogger(GrpcExporter.class.getName());

private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);

/** Returns a new {@link GrpcExporterBuilder}. */
static <T extends Marshaler> GrpcExporterBuilder<T> builder(
// We only log unimplemented once since it's a configuration issue that won't be recovered.
private final AtomicBoolean loggedUnimplemented = new AtomicBoolean();
private final AtomicBoolean isShutdown = new AtomicBoolean();

private final String type;
private final GrpcSender<T> grpcSender;
private final ExporterMetrics exporterMetrics;

public GrpcExporter(
String exporterName,
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcEndpointPath) {
return new GrpcExporterBuilder<>(
exporterName, type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath);
GrpcSender<T> grpcSender,
Supplier<MeterProvider> meterProviderSupplier) {
this.type = type;
this.grpcSender = grpcSender;
this.exporterMetrics = ExporterMetrics.createGrpc(exporterName, type, meterProviderSupplier);
}

/**
* Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems}
* items.
*/
CompletableResultCode export(T exportRequest, int numItems);
public CompletableResultCode export(T exportRequest, int numItems) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

exporterMetrics.addSeen(numItems);

/** Shuts the exporter down. */
CompletableResultCode shutdown();
CompletableResultCode result = new CompletableResultCode();

grpcSender.send(
exportRequest,
() -> {
exporterMetrics.addSuccess(numItems);
result.succeed();
},
(response, throwable) -> {
exporterMetrics.addFailed(numItems);
switch (response.grpcStatusValue()) {
case GRPC_STATUS_UNIMPLEMENTED:
if (loggedUnimplemented.compareAndSet(false, true)) {
GrpcExporterUtil.logUnimplemented(
internalLogger, type, response.grpcStatusDescription());
}
break;
case GRPC_STATUS_UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ response.grpcStatusDescription());
break;
default:
logger.log(
Level.WARNING,
"Failed to export "
+ type
+ "s. Server responded with gRPC status code "
+ response.grpcStatusValue()
+ ". Error message: "
+ response.grpcStatusDescription());
break;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(
Level.FINEST, "Failed to export " + type + "s. Details follow: " + throwable);
}
result.fail();
});

return result;
}

public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return grpcSender.shutdown();
}
}
Loading

0 comments on commit a3bf8af

Please sign in to comment.