From a3bf8af42ebdeee2b7bb513464daa214af1f633e Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Wed, 9 Aug 2023 14:45:31 -0500 Subject: [PATCH] Break out GrpcSender, GrpcSenderProvider (#5617) --- README.md | 25 +-- ...y-exporter-sender-grpc-managed-channel.txt | 2 + exporters/common/build.gradle.kts | 13 +- .../exporter/internal/RetryUtil.java | 50 ++++++ .../exporter/internal/grpc/GrpcExporter.java | 111 +++++++++--- .../internal/grpc/GrpcExporterBuilder.java | 161 +++++++++-------- .../internal/grpc/GrpcExporterUtil.java | 39 ++++- .../exporter/internal/grpc/GrpcResponse.java | 28 +++ .../exporter/internal/grpc/GrpcSender.java | 24 +++ .../internal/grpc/GrpcSenderProvider.java | 41 +++++ .../internal/grpc/GrpcStatusUtil.java | 50 ------ .../internal/grpc/ManagedChannelUtil.java | 2 +- .../internal/grpc/UpstreamGrpcExporter.java | 131 -------------- .../exporter/internal/http/HttpExporter.java | 4 +- .../exporter/internal/retry/RetryUtil.java | 47 ----- .../exporter/internal/retry/package-info.java | 10 -- .../internal/auth/AuthenticatorTest.java | 3 +- .../grpc/GrpcExporterBuilderTest.java | 154 ++-------------- .../internal/grpc/GrpcExporterTest.java | 27 +++ ...tilTest.java => GrpcExporterUtilTest.java} | 14 +- .../internal/grpc/GrpcExporterTest.java | 160 +++++++++++++++++ exporters/jaeger/build.gradle.kts | 2 +- .../jaeger/JaegerGrpcSpanExporterBuilder.java | 3 +- .../jaeger/JaegerGrpcSpanExporterTest.java | 15 +- .../JaegerGrpcSpanExporterProviderTest.java | 8 +- exporters/otlp/all/build.gradle.kts | 44 ++++- .../otlp/trace/OltpExporterBenchmark.java | 55 +++--- .../OtlpGrpcLogRecordExporterBuilder.java | 3 +- .../OtlpGrpcMetricExporterBuilder.java | 3 +- .../trace/OtlpGrpcSpanExporterBuilder.java | 3 +- .../logs/OtlpGrpcLogRecordExporterTest.java | 4 +- .../metrics/OtlpGrpcMetricExporterTest.java | 4 +- .../otlp/trace/OtlpGrpcSpanExporterTest.java | 4 +- .../OtlpGrpcNettyLogRecordExporterTest.java | 4 +- .../OtlpGrpcNettyMetricExporterTest.java | 4 +- .../trace/OtlpGrpcNettySpanExporterTest.java | 4 +- ...pGrpcNettyShadedLogRecordExporterTest.java | 4 +- ...OtlpGrpcNettyShadedMetricExporterTest.java | 4 +- .../OtlpGrpcNettyShadedSpanExporterTest.java | 4 +- ...pGrpcNettyOkHttpLogRecordExporterTest.java | 4 +- .../OtlpGrpcOkHttpMetricExporterTest.java | 4 +- .../trace/OtlpGrpcOkHttpSpanExporterTest.java | 4 +- .../AbstractGrpcTelemetryExporterTest.java | 70 +++----- .../GrpcLogRecordExporterBuilderWrapper.java | 4 +- .../GrpcMetricExporterBuilderWrapper.java | 4 +- .../GrpcSpanExporterBuilderWrapper.java | 4 +- .../HttpLogRecordExporterBuilderWrapper.java | 2 +- .../HttpMetricExporterBuilderWrapper.java | 2 +- .../HttpSpanExporterBuilderWrapper.java | 2 +- ...anagedChannelTelemetryExporterBuilder.java | 2 +- .../internal/TelemetryExporterBuilder.java | 3 +- .../grpc-managed-channel/build.gradle.kts | 16 ++ .../internal/UpstreamGrpcSender.java | 66 +++++++ .../internal/UpstreamGrpcSenderProvider.java | 71 ++++++++ .../internal}/package-info.java | 3 +- ....exporter.internal.grpc.GrpcSenderProvider | 1 + exporters/sender/okhttp/build.gradle.kts | 2 + .../okhttp/internal}/GrpcRequestBody.java | 2 +- .../okhttp/internal/OkHttpGrpcSender.java} | 164 ++++++++---------- .../internal/OkHttpGrpcSenderProvider.java | 51 ++++++ .../okhttp/internal/OkHttpHttpSender.java | 4 +- .../sender/okhttp/internal}/OkHttpUtil.java | 2 +- .../okhttp/internal}/RetryInterceptor.java | 2 +- .../sender/okhttp/internal/package-info.java | 1 - ....exporter.internal.grpc.GrpcSenderProvider | 1 + .../internal}/RetryInterceptorTest.java | 2 +- .../SpanExporterConfigurationTest.java | 4 +- .../jaeger-remote-sampler/build.gradle.kts | 1 + .../sampler/JaegerRemoteSamplerBuilder.java | 2 +- .../jaeger/sampler/OkHttpGrpcService.java | 10 +- settings.gradle.kts | 1 + 71 files changed, 1026 insertions(+), 753 deletions(-) create mode 100644 docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/RetryUtil.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcResponse.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSender.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtil.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/UpstreamGrpcExporter.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/package-info.java create mode 100644 exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java rename exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/{GrpcStatusUtilTest.java => GrpcExporterUtilTest.java} (77%) create mode 100644 exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java create mode 100644 exporters/sender/grpc-managed-channel/build.gradle.kts create mode 100644 exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java create mode 100644 exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java rename exporters/{common/src/main/java/io/opentelemetry/exporter/internal/okhttp => sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal}/package-info.java (66%) create mode 100644 exporters/sender/grpc-managed-channel/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider rename exporters/{common/src/main/java/io/opentelemetry/exporter/internal/grpc => sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal}/GrpcRequestBody.java (97%) rename exporters/{common/src/main/java/io/opentelemetry/exporter/internal/grpc/OkHttpGrpcExporter.java => sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java} (56%) create mode 100644 exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java rename exporters/{common/src/main/java/io/opentelemetry/exporter/internal/okhttp => sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal}/OkHttpUtil.java (94%) rename exporters/{common/src/main/java/io/opentelemetry/exporter/internal/retry => sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal}/RetryInterceptor.java (98%) create mode 100644 exporters/sender/okhttp/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider rename exporters/{common/src/test/java/io/opentelemetry/exporter/internal/retry => sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal}/RetryInterceptorTest.java (99%) diff --git a/README.md b/README.md index c7ccb1b96f5..a8943cf7f38 100644 --- a/README.md +++ b/README.md @@ -249,18 +249,19 @@ dependency as follows, replacing `{{artifact-id}}` with the value from the "Arti ### SDK Exporters -| Component | Description | Artifact ID | Version | Javadoc | -|-----------------------------------------------------|--------------------------------------------------------------------|----------------------------------------|-------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| [OTLP Exporters](./exporters/otlp/all) | OTLP gRPC & HTTP exporters, including traces, metrics, and logs | `opentelemetry-exporter-otlp` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp) | -| [OTLP Common](./exporters/otlp/common) | Shared OTLP components (internal) | `opentelemetry-exporter-otlp-common` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp-common) | -| [Jaeger gRPC Exporter](./exporters/jaeger) | Jaeger gRPC trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger) | -| [Jaeger Thrift Exporter](./exporters/jaeger-thrift) | Jaeger thrift trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger-thift` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger-thrift.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger-thrift) | -| [Logging Exporter](./exporters/logging) | Logging exporters, including metrics, traces, and logs | `opentelemetry-exporter-logging` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-logging.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-logging) | -| [Zipkin Exporter](./exporters/zipkin) | Zipkin trace exporter | `opentelemetry-exporter-zipkin` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-zipkin.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-zipkin) | -| [Prometheus Exporter](./exporters/prometheus) | Prometheus metric exporter | `opentelemetry-exporter-prometheus` | 1.28.0-alpha | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-prometheus.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-prometheus) | -| [Exporter Common](./exporters/common) | Shared exporter components (internal) | `opentelemetry-exporter-common` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-common) | -| [OkHttp Sender](./exporters/sender/okhttp) | OkHttp implementation of HttpSender (internal) | `opentelemetry-exporter-sender-okhttp` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-okhttp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-okhttp) | -| [JDK Sender](./exporters/sender/okhttp) | Java 11+ native HttpClient implementation of HttpSender (internal) | `opentelemetry-exporter-sender-jdk` | 1.28.0-alpha | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-jdk.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-jdk) | | +| Component | Description | Artifact ID | Version | Javadoc | +|-----------------------------------------------------------------------|--------------------------------------------------------------------|------------------------------------------------------|-------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| [OTLP Exporters](./exporters/otlp/all) | OTLP gRPC & HTTP exporters, including traces, metrics, and logs | `opentelemetry-exporter-otlp` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp) | +| [OTLP Common](./exporters/otlp/common) | Shared OTLP components (internal) | `opentelemetry-exporter-otlp-common` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-otlp-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-otlp-common) | +| [Jaeger gRPC Exporter](./exporters/jaeger) | Jaeger gRPC trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger) | +| [Jaeger Thrift Exporter](./exporters/jaeger-thrift) | Jaeger thrift trace exporter (deprecated [1]) | `opentelemetry-exporter-jaeger-thift` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-jaeger-thrift.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-jaeger-thrift) | +| [Logging Exporter](./exporters/logging) | Logging exporters, including metrics, traces, and logs | `opentelemetry-exporter-logging` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-logging.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-logging) | +| [Zipkin Exporter](./exporters/zipkin) | Zipkin trace exporter | `opentelemetry-exporter-zipkin` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-zipkin.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-zipkin) | +| [Prometheus Exporter](./exporters/prometheus) | Prometheus metric exporter | `opentelemetry-exporter-prometheus` | 1.28.0-alpha | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-prometheus.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-prometheus) | +| [Exporter Common](./exporters/common) | Shared exporter components (internal) | `opentelemetry-exporter-common` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-common.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-common) | +| [OkHttp Sender](./exporters/sender/okhttp) | OkHttp implementation of HttpSender (internal) | `opentelemetry-exporter-sender-okhttp` | 1.28.0 | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-okhttp.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-okhttp) | +| [JDK Sender](./exporters/sender/okhttp) | Java 11+ native HttpClient implementation of HttpSender (internal) | `opentelemetry-exporter-sender-jdk` | 1.28.0-alpha | [![Javadocs](https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporter-sender-jdk.svg)](https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporter-sender-jdk) | | +| [gRPC ManagedChannel Sender](./exporters/sender/grpc-managed-channel) | gRPC ManagedChannel implementation of GrpcSender (internal) | `opentelemetry-exporter-sender-grpc-managed-channel` | 1.28.0 | TODO: add link after 1.29.0 | | **[1]**: Jaeger now has [native support for OTLP](https://opentelemetry.io/blog/2022/jaeger-native-otlp/) and jaeger diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt new file mode 100644 index 00000000000..df26146497b --- /dev/null +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-sender-grpc-managed-channel.txt @@ -0,0 +1,2 @@ +Comparing source compatibility of against +No changes. \ No newline at end of file diff --git a/exporters/common/build.gradle.kts b/exporters/common/build.gradle.kts index 98ea5b45597..d6cb5ebe86f 100644 --- a/exporters/common/build.gradle.kts +++ b/exporters/common/build.gradle.kts @@ -21,13 +21,11 @@ dependencies { // We include helpers shared by gRPC exporters but do not want to impose these // dependency on all of our consumers. compileOnly("com.fasterxml.jackson.core:jackson-core") - compileOnly("com.squareup.okhttp3:okhttp") compileOnly("io.grpc:grpc-stub") testImplementation(project(":sdk:common")) testImplementation("com.google.protobuf:protobuf-java-util") - testImplementation("com.squareup.okhttp3:okhttp") testImplementation("com.linecorp.armeria:armeria-junit5") testImplementation("org.skyscreamer:jsonassert") testImplementation("com.google.api.grpc:proto-google-common-protos") @@ -53,6 +51,17 @@ testing { } } } + suites { + register("testGrpcSenderProvider") { + dependencies { + implementation(project(":exporters:sender:okhttp")) + implementation(project(":exporters:sender:grpc-managed-channel")) + + implementation("io.grpc:grpc-stub") + implementation("io.grpc:grpc-netty") + } + } + } } tasks { diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/RetryUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/RetryUtil.java new file mode 100644 index 00000000000..3e3b66af8ac --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/RetryUtil.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal; + +import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class RetryUtil { + + private static final Set RETRYABLE_GRPC_STATUS_CODES; + private static final Set RETRYABLE_HTTP_STATUS_CODES = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(429, 502, 503, 504))); + + static { + Set retryableGrpcStatusCodes = new HashSet<>(); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_CANCELLED); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_DEADLINE_EXCEEDED); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_RESOURCE_EXHAUSTED); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_ABORTED); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_OUT_OF_RANGE); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE); + retryableGrpcStatusCodes.add(GrpcExporterUtil.GRPC_STATUS_DATA_LOSS); + RETRYABLE_GRPC_STATUS_CODES = + Collections.unmodifiableSet( + retryableGrpcStatusCodes.stream().map(Object::toString).collect(Collectors.toSet())); + } + + private RetryUtil() {} + + /** Returns the retryable gRPC status codes. */ + public static Set retryableGrpcStatusCodes() { + return RETRYABLE_GRPC_STATUS_CODES; + } + + /** Returns the retryable HTTP status codes. */ + public static Set retryableHttpResponseCodes() { + return RETRYABLE_HTTP_STATUS_CODES; + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java index 50e3b99238a..6a74ba5df8a 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java @@ -5,39 +5,110 @@ package io.opentelemetry.exporter.internal.grpc; -import io.grpc.Channel; +import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE; +import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNIMPLEMENTED; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterMetrics; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.net.URI; -import java.util.function.BiFunction; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; /** - * An exporter of a {@link Marshaler} using the gRPC wire format. + * Generic gRPC exporter. * *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public interface GrpcExporter { +@SuppressWarnings("checkstyle:JavadocMethod") +public final class GrpcExporter { + + private static final Logger internalLogger = Logger.getLogger(GrpcExporter.class.getName()); + + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - /** Returns a new {@link GrpcExporterBuilder}. */ - static GrpcExporterBuilder builder( + // We only log unimplemented once since it's a configuration issue that won't be recovered. + private final AtomicBoolean loggedUnimplemented = new AtomicBoolean(); + private final AtomicBoolean isShutdown = new AtomicBoolean(); + + private final String type; + private final GrpcSender grpcSender; + private final ExporterMetrics exporterMetrics; + + public GrpcExporter( String exporterName, String type, - long defaultTimeoutSecs, - URI defaultEndpoint, - Supplier>> stubFactory, - String grpcEndpointPath) { - return new GrpcExporterBuilder<>( - exporterName, type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath); + GrpcSender grpcSender, + Supplier meterProviderSupplier) { + this.type = type; + this.grpcSender = grpcSender; + this.exporterMetrics = ExporterMetrics.createGrpc(exporterName, type, meterProviderSupplier); } - /** - * Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems} - * items. - */ - CompletableResultCode export(T exportRequest, int numItems); + public CompletableResultCode export(T exportRequest, int numItems) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + + exporterMetrics.addSeen(numItems); - /** Shuts the exporter down. */ - CompletableResultCode shutdown(); + CompletableResultCode result = new CompletableResultCode(); + + grpcSender.send( + exportRequest, + () -> { + exporterMetrics.addSuccess(numItems); + result.succeed(); + }, + (response, throwable) -> { + exporterMetrics.addFailed(numItems); + switch (response.grpcStatusValue()) { + case GRPC_STATUS_UNIMPLEMENTED: + if (loggedUnimplemented.compareAndSet(false, true)) { + GrpcExporterUtil.logUnimplemented( + internalLogger, type, response.grpcStatusDescription()); + } + break; + case GRPC_STATUS_UNAVAILABLE: + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server is UNAVAILABLE. " + + "Make sure your collector is running and reachable from this network. " + + "Full error message:" + + response.grpcStatusDescription()); + break; + default: + logger.log( + Level.WARNING, + "Failed to export " + + type + + "s. Server responded with gRPC status code " + + response.grpcStatusValue() + + ". Error message: " + + response.grpcStatusDescription()); + break; + } + if (logger.isLoggable(Level.FINEST)) { + logger.log( + Level.FINEST, "Failed to export " + type + "s. Details follow: " + throwable); + } + result.fail(); + }); + + return result; + } + + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + return CompletableResultCode.ofSuccess(); + } + return grpcSender.shutdown(); + } } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index 8845c381cf2..31eb7745d54 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -6,35 +6,28 @@ package io.opentelemetry.exporter.internal.grpc; import io.grpc.Channel; -import io.grpc.ClientInterceptors; -import io.grpc.Codec; import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.stub.MetadataUtils; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.internal.ConfigUtil; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.exporter.internal.okhttp.OkHttpUtil; -import io.opentelemetry.exporter.internal.retry.RetryInterceptor; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.ServiceLoader; import java.util.StringJoiner; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; -import okhttp3.Headers; -import okhttp3.OkHttpClient; -import okhttp3.Protocol; /** * A builder for {@link GrpcExporter}. @@ -45,6 +38,8 @@ @SuppressWarnings("JavadocMethod") public class GrpcExporterBuilder { + private static final Logger LOGGER = Logger.getLogger(GrpcExporterBuilder.class.getName()); + private final String exporterName; private final String type; private final String grpcEndpointPath; @@ -62,7 +57,7 @@ public class GrpcExporterBuilder { // Use Object type since gRPC may not be on the classpath. @Nullable private Object grpcChannel; - GrpcExporterBuilder( + public GrpcExporterBuilder( String exporterName, String type, long defaultTimeoutSecs, @@ -158,49 +153,22 @@ public GrpcExporterBuilder copy() { } public GrpcExporter build() { - if (grpcChannel != null) { - return new UpstreamGrpcExporterFactory().buildWithChannel((Channel) grpcChannel); - } - - OkHttpClient.Builder clientBuilder = - new OkHttpClient.Builder().dispatcher(OkHttpUtil.newDispatcher()); - - clientBuilder.callTimeout(Duration.ofNanos(timeoutNanos)); - - SSLContext sslContext = tlsConfigHelper.getSslContext(); - X509TrustManager trustManager = tlsConfigHelper.getTrustManager(); - if (sslContext != null && trustManager != null) { - clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); - } - - String endpoint = this.endpoint.resolve(grpcEndpointPath).toString(); - if (endpoint.startsWith("http://")) { - clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); - } else { - clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); - } - - Headers.Builder headers = new Headers.Builder(); - this.headers.forEach(headers::add); - - headers.add("te", "trailers"); - if (compressionEnabled) { - headers.add("grpc-encoding", "gzip"); - } - - if (retryPolicy != null) { - clientBuilder.addInterceptor( - new RetryInterceptor(retryPolicy, OkHttpGrpcExporter::isRetryable)); - } + GrpcSenderProvider grpcSenderProvider = resolveGrpcSenderProvider(); + GrpcSender grpcSender = + grpcSenderProvider.createSender( + endpoint, + grpcEndpointPath, + compressionEnabled, + timeoutNanos, + headers, + grpcChannel, + grpcStubFactory, + retryPolicy, + tlsConfigHelper.getSslContext(), + tlsConfigHelper.getTrustManager()); + LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); - return new OkHttpGrpcExporter<>( - exporterName, - type, - clientBuilder.build(), - meterProviderSupplier, - endpoint, - headers.build(), - compressionEnabled); + return new GrpcExporter<>(exporterName, type, grpcSender, meterProviderSupplier); } public String toString(boolean includePrefixAndSuffix) { @@ -233,34 +201,63 @@ public String toString() { return toString(true); } - // Use an inner class to ensure GrpcExporterBuilder does not have classloading dependencies on - // upstream gRPC. - private class UpstreamGrpcExporterFactory { - private GrpcExporter buildWithChannel(Channel channel) { - Metadata metadata = new Metadata(); - String authorityOverride = null; - for (Map.Entry entry : headers.entrySet()) { - String name = entry.getKey(); - String value = entry.getValue(); - if (name.equals("host")) { - authorityOverride = value; - continue; - } - metadata.put(Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER), value); - } - - channel = - ClientInterceptors.intercept( - channel, MetadataUtils.newAttachHeadersInterceptor(metadata)); - - Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; - MarshalerServiceStub stub = - grpcStubFactory - .get() - .apply(channel, authorityOverride) - .withCompression(codec.getMessageEncoding()); - return new UpstreamGrpcExporter<>( - exporterName, type, stub, meterProviderSupplier, timeoutNanos); + /** + * Resolve the {@link GrpcSenderProvider}. + * + *

If no {@link GrpcSenderProvider} is available, throw {@link IllegalStateException}. + * + *

If only one {@link GrpcSenderProvider} is available, use it. + * + *

If multiple are available and.. + * + *

    + *
  • {@code io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider} is empty, use the + * first found. + *
  • {@code io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider} is set, use the + * matching provider. If none match, throw {@link IllegalStateException}. + *
+ */ + private static GrpcSenderProvider resolveGrpcSenderProvider() { + Map grpcSenderProviders = new HashMap<>(); + for (GrpcSenderProvider spi : + ServiceLoader.load(GrpcSenderProvider.class, GrpcExporterBuilder.class.getClassLoader())) { + grpcSenderProviders.put(spi.getClass().getName(), spi); } + + // No provider on classpath, throw + if (grpcSenderProviders.isEmpty()) { + throw new IllegalStateException( + "No GrpcSenderProvider found on classpath. Please add dependency on " + + "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-grpc-upstream"); + } + + // Exactly one provider on classpath, use it + if (grpcSenderProviders.size() == 1) { + return grpcSenderProviders.values().stream().findFirst().get(); + } + + // If we've reached here, there are multiple GrpcSenderProviders + String configuredSender = + ConfigUtil.getString("io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", ""); + + // Multiple providers but none configured, use first we find and log a warning + if (configuredSender.isEmpty()) { + LOGGER.log( + Level.WARNING, + "Multiple GrpcSenderProvider found. Please include only one, " + + "or specify preference setting io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider " + + "to the FQCN of the preferred provider."); + return grpcSenderProviders.values().stream().findFirst().get(); + } + + // Multiple providers with configuration match, use configuration match + if (grpcSenderProviders.containsKey(configuredSender)) { + return grpcSenderProviders.get(configuredSender); + } + + // Multiple providers, configured does not match, throw + throw new IllegalStateException( + "No GrpcSenderProvider matched configured io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider: " + + configuredSender); } } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtil.java index 2e019cfc400..2747b6a260e 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtil.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtil.java @@ -5,11 +5,27 @@ package io.opentelemetry.exporter.internal.grpc; +import io.opentelemetry.exporter.internal.marshal.CodedInputStream; +import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; -final class GrpcExporterUtil { +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class GrpcExporterUtil { + + public static final int GRPC_STATUS_CANCELLED = 1; + public static final int GRPC_STATUS_UNKNOWN = 2; + public static final int GRPC_STATUS_DEADLINE_EXCEEDED = 4; + public static final int GRPC_STATUS_RESOURCE_EXHAUSTED = 8; + public static final int GRPC_STATUS_ABORTED = 10; + public static final int GRPC_STATUS_OUT_OF_RANGE = 11; + public static final int GRPC_STATUS_UNIMPLEMENTED = 12; + public static final int GRPC_STATUS_UNAVAILABLE = 14; + public static final int GRPC_STATUS_DATA_LOSS = 15; static void logUnimplemented(Logger logger, String type, @Nullable String fullErrorMessage) { String envVar; @@ -44,4 +60,25 @@ static void logUnimplemented(Logger logger, String type, @Nullable String fullEr } private GrpcExporterUtil() {} + + /** Parses the message out of a serialized gRPC Status. */ + public static String getStatusMessage(byte[] serializedStatus) throws IOException { + CodedInputStream input = CodedInputStream.newInstance(serializedStatus); + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 18: + return input.readStringRequireUtf8(); + default: + input.skipField(tag); + break; + } + } + // Serialized Status proto had no message, proto always defaults to empty string when not found. + return ""; + } } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcResponse.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcResponse.java new file mode 100644 index 00000000000..01ac2f53e6f --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcResponse.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.grpc; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +@AutoValue +public abstract class GrpcResponse { + + GrpcResponse() {} + + public static GrpcResponse create(int grpcStatusValue, @Nullable String grpcStatusDescription) { + return new AutoValue_GrpcResponse(grpcStatusValue, grpcStatusDescription); + } + + public abstract int grpcStatusValue(); + + @Nullable + public abstract String grpcStatusDescription(); +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSender.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSender.java new file mode 100644 index 00000000000..d2dc05b16fd --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSender.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.grpc; + +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.function.BiConsumer; + +/** + * An exporter of a messages encoded by {@link Marshaler} using the gRPC wire format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface GrpcSender { + + void send(T request, Runnable onSuccess, BiConsumer onError); + + /** Shutdown the sender. */ + CompletableResultCode shutdown(); +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java new file mode 100644 index 00000000000..f3cacdf5409 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.grpc; + +import io.grpc.Channel; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.net.URI; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; + +/** + * A service provider interface (SPI) for providing {@link GrpcSender}s backed by different client + * libraries. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface GrpcSenderProvider { + + /** Returns a {@link GrpcSender} configured with the provided parameters. */ + @SuppressWarnings("TooManyParameters") + GrpcSender createSender( + URI endpoint, + String endpointPath, + boolean compressionEnabled, + long timeoutNanos, + Map headers, + @Nullable Object managedChannel, + Supplier>> stubFactory, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager); +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtil.java deleted file mode 100644 index a906f65684b..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtil.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.grpc; - -import io.opentelemetry.exporter.internal.marshal.CodedInputStream; -import java.io.IOException; - -/** - * Utilities for working with gRPC status without requiring dependencies on gRPC. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -public final class GrpcStatusUtil { - - public static final String GRPC_STATUS_CANCELLED = "1"; - public static final String GRPC_STATUS_DEADLINE_EXCEEDED = "4"; - public static final String GRPC_STATUS_RESOURCE_EXHAUSTED = "8"; - public static final String GRPC_STATUS_ABORTED = "10"; - public static final String GRPC_STATUS_OUT_OF_RANGE = "11"; - public static final String GRPC_STATUS_UNIMPLEMENTED = "12"; - public static final String GRPC_STATUS_UNAVAILABLE = "14"; - public static final String GRPC_STATUS_DATA_LOSS = "15"; - - /** Parses the message out of a serialized gRPC Status. */ - public static String getStatusMessage(byte[] serializedStatus) throws IOException { - CodedInputStream input = CodedInputStream.newInstance(serializedStatus); - boolean done = false; - while (!done) { - int tag = input.readTag(); - switch (tag) { - case 0: - done = true; - break; - case 18: - return input.readStringRequireUtf8(); - default: - input.skipField(tag); - break; - } - } - // Serialized Status proto had no message, proto always defaults to empty string when not found. - return ""; - } - - private GrpcStatusUtil() {} -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java index 4af57082129..de4adb4512e 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/ManagedChannelUtil.java @@ -9,7 +9,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.opentelemetry.exporter.internal.retry.RetryUtil; +import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.util.Collections; diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/UpstreamGrpcExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/UpstreamGrpcExporter.java deleted file mode 100644 index 8be6a425193..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/UpstreamGrpcExporter.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.grpc; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import io.grpc.Status; -import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.ExporterMetrics; -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * A {@link GrpcExporter} which uses the upstream grpc-java library. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -public final class UpstreamGrpcExporter implements GrpcExporter { - - private static final Logger internalLogger = - Logger.getLogger(UpstreamGrpcExporter.class.getName()); - - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - - // We only log unavailable once since it's a configuration issue that won't be recovered. - private final AtomicBoolean loggedUnimplemented = new AtomicBoolean(); - private final AtomicBoolean isShutdown = new AtomicBoolean(); - - private final String type; - private final ExporterMetrics exporterMetrics; - private final MarshalerServiceStub stub; - private final long timeoutNanos; - - /** Creates a new {@link UpstreamGrpcExporter}. */ - UpstreamGrpcExporter( - String exporterName, - String type, - MarshalerServiceStub stub, - Supplier meterProviderSupplier, - long timeoutNanos) { - this.type = type; - this.exporterMetrics = ExporterMetrics.createGrpc(exporterName, type, meterProviderSupplier); - this.timeoutNanos = timeoutNanos; - this.stub = stub; - } - - @Override - public CompletableResultCode export(T exportRequest, int numItems) { - if (isShutdown.get()) { - return CompletableResultCode.ofFailure(); - } - - exporterMetrics.addSeen(numItems); - - CompletableResultCode result = new CompletableResultCode(); - - MarshalerServiceStub stub = this.stub; - if (timeoutNanos > 0) { - stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); - } - Futures.addCallback( - stub.export(exportRequest), - new FutureCallback() { - @Override - public void onSuccess(@Nullable Object unused) { - exporterMetrics.addSuccess(numItems); - result.succeed(); - } - - @Override - public void onFailure(Throwable t) { - exporterMetrics.addFailed(numItems); - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNIMPLEMENTED: - if (loggedUnimplemented.compareAndSet(false, true)) { - GrpcExporterUtil.logUnimplemented(internalLogger, type, status.getDescription()); - } - break; - case UNAVAILABLE: - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. Server is UNAVAILABLE. " - + "Make sure your collector is running and reachable from this network. " - + "Full error message:" - + status.getDescription()); - break; - default: - logger.log( - Level.WARNING, - "Failed to export " - + type - + "s. Server responded with gRPC status code " - + status.getCode().value() - + ". Error message: " - + status.getDescription()); - break; - } - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + t); - } - result.fail(); - } - }, - MoreExecutors.directExecutor()); - - return result; - } - - @Override - public CompletableResultCode shutdown() { - if (!isShutdown.compareAndSet(false, true)) { - logger.log(Level.INFO, "Calling shutdown() multiple times."); - } - return CompletableResultCode.ofSuccess(); - } -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java index f13f018c9b9..f7af7150f74 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java @@ -7,7 +7,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.exporter.internal.ExporterMetrics; -import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil; +import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.ThrottlingLogger; @@ -137,7 +137,7 @@ private static String extractErrorStatus(String statusMessage, @Nullable byte[] return "Response body missing, HTTP status message: " + statusMessage; } try { - return GrpcStatusUtil.getStatusMessage(responseBody); + return GrpcExporterUtil.getStatusMessage(responseBody); } catch (IOException e) { return "Unable to parse response body, HTTP status message: " + statusMessage; } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java deleted file mode 100644 index 83eef920fce..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.retry; - -import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -/** - * This class is internal and is hence not for public use. Its APIs are unstable and can change at - * any time. - */ -public class RetryUtil { - - private static final Set RETRYABLE_GRPC_STATUS_CODES; - private static final Set RETRYABLE_HTTP_STATUS_CODES = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList(429, 502, 503, 504))); - - static { - Set retryableGrpcStatusCodes = new HashSet<>(); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_CANCELLED); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_DEADLINE_EXCEEDED); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_RESOURCE_EXHAUSTED); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_ABORTED); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_OUT_OF_RANGE); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_UNAVAILABLE); - retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_DATA_LOSS); - RETRYABLE_GRPC_STATUS_CODES = Collections.unmodifiableSet(retryableGrpcStatusCodes); - } - - private RetryUtil() {} - - /** Returns the retryable gRPC status codes. */ - public static Set retryableGrpcStatusCodes() { - return RETRYABLE_GRPC_STATUS_CODES; - } - - /** Returns the retryable HTTP status codes. */ - public static Set retryableHttpResponseCodes() { - return RETRYABLE_HTTP_STATUS_CODES; - } -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/package-info.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/package-info.java deleted file mode 100644 index 7564da39e60..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -/** Logic for retrying of export requests. */ -@ParametersAreNonnullByDefault -package io.opentelemetry.exporter.internal.retry; - -import javax.annotation.ParametersAreNonnullByDefault; diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java index 9722aeb617d..93e07e12d41 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java @@ -59,8 +59,7 @@ void setAuthenticatorOnDelegate_Fail() { assertThatThrownBy( () -> Authenticator.setAuthenticatorOnDelegate( - new WithDelegate(GrpcExporter.builder(null, null, 0, null, null, null)), - authenticator)) + new WithDelegate(new GrpcExporter<>(null, null, null, null)), authenticator)) .isInstanceOf(IllegalArgumentException.class); } diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java index 9bdf242cf6d..dcf1ea64387 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java @@ -6,178 +6,46 @@ package io.opentelemetry.exporter.internal.grpc; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.Codec; -import io.grpc.ManagedChannel; import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.net.URI; -import java.util.function.BiFunction; -import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class GrpcExporterBuilderTest { - private final ManagedChannel channel = mock(ManagedChannel.class); - private GrpcExporterBuilder builder; @BeforeEach - @SuppressWarnings("unchecked") void setUp() { - Supplier>> grpcStubFactory = - mock(Supplier.class); - when(grpcStubFactory.get()) - .thenReturn((c, s) -> new TestMarshalerServiceStub(c, CallOptions.DEFAULT)); - builder = - GrpcExporter.builder( - "otlp", "span", 0, URI.create("http://localhost:4317"), grpcStubFactory, "/test"); + new GrpcExporterBuilder<>( + "otlp", "span", 0, URI.create("http://localhost:4317"), null, "/test"); } @Test void compressionDefault() { - GrpcExporter exporter = builder.build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpGrpcExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false)); - } finally { - exporter.shutdown(); - } + assertThat(builder).extracting("compressionEnabled").isEqualTo(false); } @Test void compressionNone() { - GrpcExporter exporter = builder.setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpGrpcExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionGzip() { - GrpcExporter exporter = builder.setCompression("gzip").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpGrpcExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(true)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionEnabledAndDisabled() { - GrpcExporter exporter = - builder.setCompression("gzip").setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpGrpcExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } + builder.setCompression("none"); - @Test - void compressionDefaultWithChannel() { - GrpcExporter exporter = builder.setChannel(channel).build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - UpstreamGrpcExporter.class, - otlp -> - assertThat(otlp) - .extracting("stub") - .extracting("callOptions.compressorName") - .isEqualTo(Codec.Identity.NONE.getMessageEncoding())); - } finally { - exporter.shutdown(); - } + assertThat(builder).extracting("compressionEnabled").isEqualTo(false); } @Test - void compressionNoneWithChannel() { - GrpcExporter exporter = builder.setChannel(channel).setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - UpstreamGrpcExporter.class, - otlp -> - assertThat(otlp) - .extracting("stub") - .extracting("callOptions.compressorName") - .isEqualTo(Codec.Identity.NONE.getMessageEncoding())); - } finally { - exporter.shutdown(); - } - } + void compressionGzip() { + builder.setCompression("gzip"); - @Test - void compressionGzipWithChannel() { - GrpcExporter exporter = builder.setChannel(channel).setCompression("gzip").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - UpstreamGrpcExporter.class, - otlp -> - assertThat(otlp) - .extracting("stub") - .extracting("callOptions.compressorName") - .isEqualTo(new Codec.Gzip().getMessageEncoding())); - } finally { - exporter.shutdown(); - } + assertThat(builder).extracting("compressionEnabled").isEqualTo(true); } @Test - void compressionEnabledAndDisabledWithChannel() { - GrpcExporter exporter = - builder.setChannel(channel).setCompression("gzip").setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - UpstreamGrpcExporter.class, - otlp -> - assertThat(otlp) - .extracting("stub") - .extracting("callOptions.compressorName") - .isEqualTo(Codec.Identity.NONE.getMessageEncoding())); - } finally { - exporter.shutdown(); - } - } - - private final class TestMarshalerServiceStub - extends MarshalerServiceStub { - - private TestMarshalerServiceStub(Channel channel, CallOptions callOptions) { - super(channel, callOptions); - } - - @Override - protected TestMarshalerServiceStub build(Channel channel, CallOptions callOptions) { - return new TestMarshalerServiceStub(channel, callOptions); - } + void compressionEnabledAndDisabled() { + builder.setCompression("gzip").setCompression("none"); - @Override - public ListenableFuture export(Marshaler request) { - return Futures.immediateVoidFuture(); - } + assertThat(builder).extracting("compressionEnabled").isEqualTo(false); } } diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java new file mode 100644 index 00000000000..f918e782e0d --- /dev/null +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.grpc; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.net.URI; +import org.junit.jupiter.api.Test; + +class GrpcExporterTest { + + @Test + void build_NoGrpcSenderProvider() { + assertThatThrownBy( + () -> + new GrpcExporterBuilder<>( + "exporter", "type", 10, new URI("http://localhost"), null, "/path") + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "No GrpcSenderProvider found on classpath. Please add dependency on " + + "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-grpc-upstream"); + } +} diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtilTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtilTest.java similarity index 77% rename from exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtilTest.java rename to exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtilTest.java index 2537540e3dc..e9f2c3c24c2 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcStatusUtilTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterUtilTest.java @@ -14,20 +14,20 @@ import java.io.IOException; import org.junit.jupiter.api.Test; -class GrpcStatusUtilTest { +class GrpcExporterUtilTest { @Test void parseMessage() throws Exception { assertThat( - GrpcStatusUtil.getStatusMessage( + GrpcExporterUtil.getStatusMessage( Status.newBuilder().setMessage("test").build().toByteArray())) .isEqualTo("test"); assertThat( - GrpcStatusUtil.getStatusMessage( + GrpcExporterUtil.getStatusMessage( Status.newBuilder().setCode(2).setMessage("test2").build().toByteArray())) .isEqualTo("test2"); assertThat( - GrpcStatusUtil.getStatusMessage( + GrpcExporterUtil.getStatusMessage( Status.newBuilder() .setCode(2) .setMessage("test3") @@ -36,16 +36,16 @@ void parseMessage() throws Exception { .toByteArray())) .isEqualTo("test3"); assertThat( - GrpcStatusUtil.getStatusMessage( + GrpcExporterUtil.getStatusMessage( Status.newBuilder() .setCode(2) .addDetails(Any.newBuilder().setValue(ByteString.copyFromUtf8("any")).build()) .build() .toByteArray())) .isEmpty(); - assertThat(GrpcStatusUtil.getStatusMessage(Status.getDefaultInstance().toByteArray())) + assertThat(GrpcExporterUtil.getStatusMessage(Status.getDefaultInstance().toByteArray())) .isEmpty(); - assertThatThrownBy(() -> GrpcStatusUtil.getStatusMessage(new byte[] {0, 1, 3, 0})) + assertThatThrownBy(() -> GrpcExporterUtil.getStatusMessage(new byte[] {0, 1, 3, 0})) .isInstanceOf(IOException.class); } } diff --git a/exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java new file mode 100644 index 00000000000..4e2979dd275 --- /dev/null +++ b/exporters/common/src/testGrpcSenderProvider/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java @@ -0,0 +1,160 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.grpc; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.github.netmikey.logunit.api.LogCapturer; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; +import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import java.net.URI; +import java.net.URISyntaxException; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junitpioneer.jupiter.SetSystemProperty; + +class GrpcExporterTest { + + @RegisterExtension + LogCapturer logCapturer = + LogCapturer.create().captureForLogger(GrpcExporterBuilder.class.getName()); + + @Test + @SuppressLogger(GrpcExporterBuilder.class) + void build_multipleSendersNoConfiguration() { + assertThatCode( + () -> + new GrpcExporterBuilder<>( + "exporter", + "type", + 10, + new URI("http://localhost"), + () -> DummyServiceFutureStub::newFutureStub, + "/path") + .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) + .build()) + .doesNotThrowAnyException(); + + logCapturer.assertContains( + "Multiple GrpcSenderProvider found. Please include only one, " + + "or specify preference setting io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider " + + "to the FQCN of the preferred provider."); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + value = + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider") + void build_multipleSendersWithUpstream() throws URISyntaxException { + assertThat( + new GrpcExporterBuilder<>( + "exporter", + "type", + 10, + new URI("http://localhost"), + () -> DummyServiceFutureStub::newFutureStub, + "/path") + .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) + .build()) + .extracting("grpcSender") + .isInstanceOf(UpstreamGrpcSender.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + value = "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider") + void build_multipleSendersWithOkHttp() throws URISyntaxException { + assertThat( + new GrpcExporterBuilder<>( + "exporter", + "type", + 10, + new URI("http://localhost"), + () -> DummyServiceFutureStub::newFutureStub, + "/path") + .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) + .build()) + .extracting("grpcSender") + .isInstanceOf(OkHttpGrpcSender.class); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + @Test + @SetSystemProperty( + key = "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + value = "foo") + void build_multipleSendersNoMatch() { + assertThatThrownBy( + () -> + new GrpcExporterBuilder<>( + "exporter", + "type", + 10, + new URI("http://localhost"), + () -> DummyServiceFutureStub::newFutureStub, + "/path") + .setChannel(ManagedChannelBuilder.forTarget("localhost").build()) + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "No GrpcSenderProvider matched configured io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider: foo"); + + assertThat(logCapturer.getEvents()).isEmpty(); + } + + private static class DummyServiceFutureStub + extends MarshalerServiceStub { + + protected DummyServiceFutureStub(Channel channel, CallOptions callOptions) { + super(channel, callOptions); + } + + @Override + public ListenableFuture export(DummyMarshaler request) { + SettableFuture future = SettableFuture.create(); + future.set(new Object()); + return future; + } + + @Override + protected DummyServiceFutureStub build(Channel channel, CallOptions callOptions) { + return new DummyServiceFutureStub(channel, callOptions); + } + + private static DummyServiceFutureStub newFutureStub( + io.grpc.Channel channel, @Nullable String authorityOverride) { + return DummyServiceFutureStub.newStub( + (c, options) -> new DummyServiceFutureStub(c, options.withAuthority(authorityOverride)), + channel); + } + } + + private static class DummyMarshaler extends MarshalerWithSize { + + protected DummyMarshaler() { + super(0); + } + + @Override + protected void writeTo(Serializer output) {} + } +} diff --git a/exporters/jaeger/build.gradle.kts b/exporters/jaeger/build.gradle.kts index 3ad6907facd..1d37eea41eb 100644 --- a/exporters/jaeger/build.gradle.kts +++ b/exporters/jaeger/build.gradle.kts @@ -16,12 +16,12 @@ dependencies { protoSource(project(":exporters:jaeger-proto")) implementation(project(":exporters:common")) + implementation(project(":exporters:sender:okhttp")) implementation(project(":semconv")) implementation(project(":sdk-extensions:autoconfigure-spi")) compileOnly("io.grpc:grpc-stub") - implementation("com.squareup.okhttp3:okhttp") implementation("com.fasterxml.jackson.jr:jackson-jr-objects") testImplementation(project(":exporters:jaeger-proto")) diff --git a/exporters/jaeger/src/main/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterBuilder.java b/exporters/jaeger/src/main/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterBuilder.java index 06f8a36caec..e67311dc7e5 100644 --- a/exporters/jaeger/src/main/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterBuilder.java +++ b/exporters/jaeger/src/main/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterBuilder.java @@ -11,7 +11,6 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import java.net.URI; import java.time.Duration; @@ -42,7 +41,7 @@ public final class JaegerGrpcSpanExporterBuilder { JaegerGrpcSpanExporterBuilder() { delegate = - GrpcExporter.builder( + new GrpcExporterBuilder<>( "jaeger", "span", DEFAULT_TIMEOUT_SECS, diff --git a/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterTest.java b/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterTest.java index 2a63201ff06..b26ddc2fda8 100644 --- a/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterTest.java +++ b/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/JaegerGrpcSpanExporterTest.java @@ -27,7 +27,7 @@ import io.opentelemetry.api.trace.TraceId; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.exporter.internal.TlsUtil; -import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.jaeger.proto.api_v2.Collector; import io.opentelemetry.exporter.jaeger.proto.api_v2.Model; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; @@ -92,8 +92,7 @@ protected CompletionStage handleMessage( } }; - @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(OkHttpGrpcExporter.class); + @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class); @RegisterExtension static final SelfSignedCertificateExtension serverTls = new SelfSignedCertificateExtension(); @@ -371,7 +370,7 @@ void invalidConfig() { void compressionDefault() { JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().build(); try { - assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false); + assertThat(exporter).extracting("delegate.grpcSender.compressionEnabled").isEqualTo(false); } finally { exporter.shutdown(); } @@ -382,7 +381,7 @@ void compressionNone() { JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().setCompression("none").build(); try { - assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false); + assertThat(exporter).extracting("delegate.grpcSender.compressionEnabled").isEqualTo(false); } finally { exporter.shutdown(); } @@ -393,7 +392,7 @@ void compressionGzip() { JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().setCompression("gzip").build(); try { - assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(true); + assertThat(exporter).extracting("delegate.grpcSender.compressionEnabled").isEqualTo(true); } finally { exporter.shutdown(); } @@ -404,14 +403,14 @@ void compressionEnabledAndDisabled() { JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().setCompression("gzip").setCompression("none").build(); try { - assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false); + assertThat(exporter).extracting("delegate.grpcSender.compressionEnabled").isEqualTo(false); } finally { exporter.shutdown(); } } @Test - @SuppressLogger(OkHttpGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void shutdown() { JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().setEndpoint(server.httpUri().toString()).build(); diff --git a/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/internal/JaegerGrpcSpanExporterProviderTest.java b/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/internal/JaegerGrpcSpanExporterProviderTest.java index 6a2598d2305..03776d82add 100644 --- a/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/internal/JaegerGrpcSpanExporterProviderTest.java +++ b/exporters/jaeger/src/test/java/io/opentelemetry/exporter/jaeger/internal/JaegerGrpcSpanExporterProviderTest.java @@ -33,12 +33,12 @@ void createExporter_Default() { assertThat(spanExporter) .isInstanceOf(io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter.class); assertThat(spanExporter) - .extracting("delegate") + .extracting("delegate.grpcSender") .extracting("client") .extracting("callTimeoutMillis") .isEqualTo(10000); assertThat(spanExporter) - .extracting("delegate") + .extracting("delegate.grpcSender") .extracting("url") .isEqualTo( HttpUrl.get("http://localhost:14250/jaeger.api_v2.CollectorService/PostSpans")); @@ -56,12 +56,12 @@ void createExporter_WithConfiguration() { assertThat(spanExporter) .isInstanceOf(io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter.class); assertThat(spanExporter) - .extracting("delegate") + .extracting("delegate.grpcSender") .extracting("client") .extracting("callTimeoutMillis") .isEqualTo(1000); assertThat(spanExporter) - .extracting("delegate") + .extracting("delegate.grpcSender") .extracting("url") .isEqualTo(HttpUrl.get("http://endpoint:8080/jaeger.api_v2.CollectorService/PostSpans")); } diff --git a/exporters/otlp/all/build.gradle.kts b/exporters/otlp/all/build.gradle.kts index 7152aab29c6..56cb1f2df11 100644 --- a/exporters/otlp/all/build.gradle.kts +++ b/exporters/otlp/all/build.gradle.kts @@ -20,18 +20,16 @@ dependencies { implementation(project(":exporters:sender:okhttp")) implementation(project(":sdk-extensions:autoconfigure-spi")) - implementation("com.squareup.okhttp3:okhttp") - compileOnly("io.grpc:grpc-stub") - testImplementation("io.grpc:grpc-stub") - testImplementation(project(":exporters:otlp:testing-internal")) testImplementation("com.linecorp.armeria:armeria-junit5") testImplementation("com.google.api.grpc:proto-google-common-protos") testImplementation("com.squareup.okhttp3:okhttp-tls") + testImplementation("io.grpc:grpc-stub") jmhImplementation(project(":sdk:testing")) + jmhImplementation(project(":exporters:sender:grpc-managed-channel")) jmhImplementation("com.linecorp.armeria:armeria") jmhImplementation("com.linecorp.armeria:armeria-grpc") jmhImplementation("io.opentelemetry.proto:opentelemetry-proto") @@ -45,27 +43,60 @@ testing { suites { register("testGrpcNetty") { dependencies { + implementation(project(":exporters:sender:grpc-managed-channel")) implementation(project(":exporters:otlp:testing-internal")) implementation("io.grpc:grpc-netty") implementation("io.grpc:grpc-stub") } + targets { + all { + testTask { + systemProperty( + "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider" + ) + } + } + } } register("testGrpcNettyShaded") { dependencies { + implementation(project(":exporters:sender:grpc-managed-channel")) implementation(project(":exporters:otlp:testing-internal")) implementation("io.grpc:grpc-netty-shaded") implementation("io.grpc:grpc-stub") } + targets { + all { + testTask { + systemProperty( + "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider" + ) + } + } + } } register("testGrpcOkhttp") { dependencies { + implementation(project(":exporters:sender:grpc-managed-channel")) implementation(project(":exporters:otlp:testing-internal")) implementation("io.grpc:grpc-okhttp") implementation("io.grpc:grpc-stub") } + targets { + all { + testTask { + systemProperty( + "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider", + "io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider" + ) + } + } + } } register("testJdkHttpSender") { dependencies { @@ -77,7 +108,10 @@ testing { targets { all { testTask { - systemProperty("io.opentelemetry.exporter.internal.http.HttpSenderProvider", "io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider") + systemProperty( + "io.opentelemetry.exporter.internal.http.HttpSenderProvider", + "io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider" + ) enabled = !testJavaVersion.equals("8") } } diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index d06060056c7..9271ad9fc7e 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -11,15 +11,19 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; +import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; import io.opentelemetry.sdk.common.CompletableResultCode; import java.net.URI; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -62,8 +66,8 @@ public void export( private static ManagedChannel defaultGrpcChannel; - private static GrpcExporter defaultGrpcExporter; - private static GrpcExporter okhttpGrpcExporter; + private static GrpcExporter upstreamGrpcExporter; + private static GrpcExporter okhttpGrpcSender; private static HttpExporter httpExporter; @Setup(Level.Trial) @@ -74,26 +78,29 @@ public void setUp() { ManagedChannelBuilder.forAddress("localhost", server.activeLocalPort()) .usePlaintext() .build(); - defaultGrpcExporter = - GrpcExporter.builder( - "otlp", - "span", - 10, - URI.create("http://localhost:" + server.activeLocalPort()), - () -> MarshalerTraceServiceGrpc::newFutureStub, - OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH) - .setChannel(defaultGrpcChannel) - .build(); + upstreamGrpcExporter = + new GrpcExporter<>( + "otlp", + "span", + new UpstreamGrpcSender<>( + MarshalerTraceServiceGrpc.newFutureStub(defaultGrpcChannel, null), 10), + MeterProvider::noop); - okhttpGrpcExporter = - GrpcExporter.builder( - "otlp", - "span", + okhttpGrpcSender = + new GrpcExporter<>( + "otlp", + "span", + new OkHttpGrpcSender<>( + URI.create("http://localhost:" + server.activeLocalPort()) + .resolve(OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH) + .toString(), + false, 10, - URI.create("http://localhost:" + server.activeLocalPort()), - () -> MarshalerTraceServiceGrpc::newFutureStub, - OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH) - .build(); + Collections.emptyMap(), + null, + null, + null), + MeterProvider::noop); httpExporter = new HttpExporterBuilder( @@ -103,8 +110,8 @@ public void setUp() { @TearDown(Level.Trial) public void tearDown() { - defaultGrpcExporter.shutdown().join(10, TimeUnit.SECONDS); - okhttpGrpcExporter.shutdown().join(10, TimeUnit.SECONDS); + upstreamGrpcExporter.shutdown().join(10, TimeUnit.SECONDS); + okhttpGrpcSender.shutdown().join(10, TimeUnit.SECONDS); httpExporter.shutdown().join(10, TimeUnit.SECONDS); defaultGrpcChannel.shutdownNow(); server.stop().join(); @@ -113,7 +120,7 @@ public void tearDown() { @Benchmark public CompletableResultCode defaultGrpcExporter(RequestMarshalState state) { CompletableResultCode result = - defaultGrpcExporter + upstreamGrpcExporter .export(state.traceRequestMarshaler, state.numSpans) .join(10, TimeUnit.SECONDS); if (!result.isSuccess()) { @@ -125,7 +132,7 @@ public CompletableResultCode defaultGrpcExporter(RequestMarshalState state) { @Benchmark public CompletableResultCode okhttpGrpcExporter(RequestMarshalState state) { CompletableResultCode result = - okhttpGrpcExporter + okhttpGrpcSender .export(state.traceRequestMarshaler, state.numSpans) .join(10, TimeUnit.SECONDS); if (!result.isSuccess()) { diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java index 073926f317a..97b193bab3f 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java @@ -11,7 +11,6 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -48,7 +47,7 @@ public final class OtlpGrpcLogRecordExporterBuilder { OtlpGrpcLogRecordExporterBuilder() { this( - GrpcExporter.builder( + new GrpcExporterBuilder<>( "otlp", "log", DEFAULT_TIMEOUT_SECS, diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java index 2009c72bba9..5059d111987 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java @@ -10,7 +10,6 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -60,7 +59,7 @@ public final class OtlpGrpcMetricExporterBuilder { OtlpGrpcMetricExporterBuilder() { this( - GrpcExporter.builder( + new GrpcExporterBuilder<>( "otlp", "metric", DEFAULT_TIMEOUT_SECS, diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index e22fc3d4951..9a27f97fc8b 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -11,7 +11,6 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -44,7 +43,7 @@ public final class OtlpGrpcSpanExporterBuilder { OtlpGrpcSpanExporterBuilder() { this( - GrpcExporter.builder( + new GrpcExporterBuilder<>( "otlp", "span", DEFAULT_TIMEOUT_SECS, diff --git a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java index d468a46245d..de7845e8980 100644 --- a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java +++ b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterTest.java @@ -7,13 +7,13 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler; import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest; import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.io.Closeable; @@ -30,7 +30,7 @@ class OtlpGrpcLogRecordExporterTest @Test void usingOkHttp() throws Exception { try (Closeable exporter = OtlpGrpcLogRecordExporter.builder().build()) { - assertThat(exporter).extracting("delegate").isInstanceOf(OkHttpGrpcExporter.class); + assertThat(exporter).extracting("delegate.grpcSender").isInstanceOf(OkHttpGrpcSender.class); } } diff --git a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java index ec02a2b5657..65cbf9320ce 100644 --- a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java +++ b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java @@ -9,13 +9,13 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler; import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest; import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; @@ -81,7 +81,7 @@ void invalidMetricConfig() { @Test void usingOkHttp() throws Exception { try (Closeable exporter = OtlpGrpcMetricExporter.builder().build()) { - assertThat(exporter).extracting("delegate").isInstanceOf(OkHttpGrpcExporter.class); + assertThat(exporter).extracting("delegate.grpcSender").isInstanceOf(OkHttpGrpcSender.class); } } diff --git a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java index 149e668f8fd..9017cd4584e 100644 --- a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java +++ b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java @@ -7,13 +7,13 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler; import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest; import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil; import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter; import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.Closeable; @@ -29,7 +29,7 @@ class OtlpGrpcSpanExporterTest extends AbstractGrpcTelemetryExporterTest handleMessage(ServiceRequestContext ctx, byte[ } } - @RegisterExtension - LogCapturer logs = - LogCapturer.create() - .captureForType(usingOkHttp() ? OkHttpGrpcExporter.class : UpstreamGrpcExporter.class); + @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(GrpcExporter.class); private final String type; private final U resourceTelemetryInstance; @@ -251,11 +247,13 @@ void compressionWithNone() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri().toString()).setCompression("none").build(); try { - // UpstreamGrpcExporter doesn't support compression, so we skip the assertion + // UpstreamGrpcSender doesn't support compression, so we skip the assertion assumeThat(exporter.unwrap()) - .extracting("delegate") - .isNotInstanceOf(UpstreamGrpcExporter.class); - assertThat(exporter.unwrap()).extracting("delegate.compressionEnabled").isEqualTo(false); + .extracting("delegate.grpcSender") + .matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender")); + assertThat(exporter.unwrap()) + .extracting("delegate.grpcSender.compressionEnabled") + .isEqualTo(false); } finally { exporter.shutdown(); } @@ -266,11 +264,13 @@ void compressionWithGzip() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri().toString()).setCompression("gzip").build(); try { - // UpstreamGrpcExporter doesn't support compression, so we skip the assertion + // UpstreamGrpcSender doesn't support compression, so we skip the assertion assumeThat(exporter.unwrap()) - .extracting("delegate") - .isNotInstanceOf(UpstreamGrpcExporter.class); - assertThat(exporter.unwrap()).extracting("delegate.compressionEnabled").isEqualTo(true); + .extracting("delegate.grpcSender") + .matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender")); + assertThat(exporter.unwrap()) + .extracting("delegate.grpcSender.compressionEnabled") + .isEqualTo(true); } finally { exporter.shutdown(); } @@ -352,8 +352,7 @@ void tlsViaSslContext() throws Exception { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void tls_untrusted() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpsUri().toString()).build(); @@ -424,8 +423,7 @@ void deadlineSetPerExport() throws InterruptedException { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void exportAfterShutdown() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri().toString()).build(); @@ -440,8 +438,7 @@ void exportAfterShutdown() { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void doubleShutdown() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri().toString()).build(); @@ -452,8 +449,7 @@ void doubleShutdown() { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void error() { addGrpcError(13, null); assertThat( @@ -471,8 +467,7 @@ void error() { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void errorWithMessage() { addGrpcError(8, "out of quota"); assertThat( @@ -490,8 +485,7 @@ void errorWithMessage() { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void errorWithEscapedMessage() { addGrpcError(5, "クマ🐻"); assertThat( @@ -509,8 +503,7 @@ void errorWithEscapedMessage() { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void testExport_Unavailable() { addGrpcError(14, null); assertThat( @@ -529,8 +522,7 @@ void testExport_Unavailable() { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void testExport_Unimplemented() { addGrpcError(12, "UNIMPLEMENTED"); assertThat( @@ -570,8 +562,7 @@ void testExport_Unimplemented() { @ParameterizedTest @ValueSource(ints = {1, 4, 8, 10, 11, 14, 15}) - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void retryableError(int code) { addGrpcError(code, null); @@ -592,8 +583,7 @@ void retryableError(int code) { } @Test - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void retryableError_tooManyAttempts() { addGrpcError(1, null); addGrpcError(1, null); @@ -616,8 +606,7 @@ void retryableError_tooManyAttempts() { @ParameterizedTest @ValueSource(ints = {2, 3, 5, 6, 7, 9, 12, 13, 16}) - @SuppressLogger(OkHttpGrpcExporter.class) - @SuppressLogger(UpstreamGrpcExporter.class) + @SuppressLogger(GrpcExporter.class) void nonRetryableError(int code) { addGrpcError(code, null); @@ -893,13 +882,4 @@ private TelemetryExporter retryingExporter() { private static void addGrpcError(int code, @Nullable String message) { grpcErrors.add(new ArmeriaStatusException(code, message)); } - - private static boolean usingOkHttp() { - try { - Class.forName("io.grpc.internal.AbstractManagedChannelImplBuilder"); - return false; - } catch (ClassNotFoundException e) { - return true; - } - } } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java index 65389a2ad18..993b13312f3 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java @@ -85,8 +85,8 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryP @Override @SuppressWarnings("deprecation") // testing deprecated functionality - public TelemetryExporterBuilder setChannel(ManagedChannel channel) { - builder.setChannel(channel); + public TelemetryExporterBuilder setChannel(Object channel) { + builder.setChannel((ManagedChannel) channel); return this; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java index 97fc626c39c..0cfa4401ab1 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java @@ -85,8 +85,8 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryPoli @Override @SuppressWarnings("deprecation") // testing deprecated functionality - public TelemetryExporterBuilder setChannel(ManagedChannel channel) { - builder.setChannel(channel); + public TelemetryExporterBuilder setChannel(Object channel) { + builder.setChannel((ManagedChannel) channel); return this; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java index 9836268ae6a..bc4a10aa8f1 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java @@ -86,8 +86,8 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryPolicy @Override @SuppressWarnings("deprecation") // testing deprecated functionality - public TelemetryExporterBuilder setChannel(ManagedChannel channel) { - builder.setChannel(channel); + public TelemetryExporterBuilder setChannel(Object channel) { + builder.setChannel((ManagedChannel) channel); return this; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpLogRecordExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpLogRecordExporterBuilderWrapper.java index 2dc13d95b56..7e007fa6e7b 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpLogRecordExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpLogRecordExporterBuilderWrapper.java @@ -86,7 +86,7 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryP } @Override - public TelemetryExporterBuilder setChannel(io.grpc.ManagedChannel channel) { + public TelemetryExporterBuilder setChannel(Object channel) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpMetricExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpMetricExporterBuilderWrapper.java index 299b7aa016e..e3d9d1cc20e 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpMetricExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpMetricExporterBuilderWrapper.java @@ -85,7 +85,7 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryPoli } @Override - public TelemetryExporterBuilder setChannel(io.grpc.ManagedChannel channel) { + public TelemetryExporterBuilder setChannel(Object channel) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpSpanExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpSpanExporterBuilderWrapper.java index 6b63b7f177a..63660c4314b 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpSpanExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/HttpSpanExporterBuilderWrapper.java @@ -85,7 +85,7 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryPolicy } @Override - public TelemetryExporterBuilder setChannel(io.grpc.ManagedChannel channel) { + public TelemetryExporterBuilder setChannel(Object channel) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java index 5560335ba06..79781eb5809 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/ManagedChannelTelemetryExporterBuilder.java @@ -132,7 +132,7 @@ public TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) { } @Override - public TelemetryExporterBuilder setChannel(ManagedChannel channel) { + public TelemetryExporterBuilder setChannel(Object channel) { throw new UnsupportedOperationException(); } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporterBuilder.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporterBuilder.java index 014a90f6d06..95b62e88a28 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporterBuilder.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/TelemetryExporterBuilder.java @@ -5,7 +5,6 @@ package io.opentelemetry.exporter.otlp.testing.internal; -import io.grpc.ManagedChannel; import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; @@ -53,7 +52,7 @@ static TelemetryExporterBuilder wrap(OtlpGrpcLogRecordExporterBui TelemetryExporterBuilder setRetryPolicy(RetryPolicy retryPolicy); - TelemetryExporterBuilder setChannel(ManagedChannel channel); + TelemetryExporterBuilder setChannel(Object channel); TelemetryExporter build(); } diff --git a/exporters/sender/grpc-managed-channel/build.gradle.kts b/exporters/sender/grpc-managed-channel/build.gradle.kts new file mode 100644 index 00000000000..d6ceea653c8 --- /dev/null +++ b/exporters/sender/grpc-managed-channel/build.gradle.kts @@ -0,0 +1,16 @@ +plugins { + id("otel.java-conventions") + id("otel.publish-conventions") + + id("otel.animalsniffer-conventions") +} + +description = "OpenTelemetry gRPC Upstream Sender" +otelJava.moduleName.set("io.opentelemetry.exporter.sender.grpc.managedchannel.internal") + +dependencies { + implementation(project(":exporters:common")) + implementation(project(":sdk:common")) + + implementation("io.grpc:grpc-stub") +} diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java new file mode 100644 index 00000000000..2a18eaf2b53 --- /dev/null +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSender.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.grpc.managedchannel.internal; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Status; +import io.opentelemetry.exporter.internal.grpc.GrpcResponse; +import io.opentelemetry.exporter.internal.grpc.GrpcSender; +import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A {@link GrpcSender} which uses the upstream grpc-java library. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class UpstreamGrpcSender implements GrpcSender { + + private final MarshalerServiceStub stub; + private final long timeoutNanos; + + /** Creates a new {@link UpstreamGrpcSender}. */ + public UpstreamGrpcSender(MarshalerServiceStub stub, long timeoutNanos) { + this.timeoutNanos = timeoutNanos; + this.stub = stub; + } + + @Override + public void send(T request, Runnable onSuccess, BiConsumer onError) { + MarshalerServiceStub stub = this.stub; + if (timeoutNanos > 0) { + stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); + } + Futures.addCallback( + stub.export(request), + new FutureCallback() { + @Override + public void onSuccess(@Nullable Object unused) { + onSuccess.run(); + } + + @Override + public void onFailure(Throwable t) { + Status status = Status.fromThrowable(t); + onError.accept( + GrpcResponse.create(status.getCode().value(), status.getDescription()), t); + } + }, + MoreExecutors.directExecutor()); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } +} diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java new file mode 100644 index 00000000000..b560712351f --- /dev/null +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java @@ -0,0 +1,71 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.grpc.managedchannel.internal; + +import io.grpc.Channel; +import io.grpc.ClientInterceptors; +import io.grpc.Codec; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import io.opentelemetry.exporter.internal.grpc.GrpcSender; +import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; +import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.net.URI; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; + +/** + * {@link GrpcSender} SPI implementation for {@link UpstreamGrpcSender}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class UpstreamGrpcSenderProvider implements GrpcSenderProvider { + + @Override + public GrpcSender createSender( + URI endpoint, + String endpointPath, + boolean compressionEnabled, + long timeoutNanos, + Map headers, + @Nullable Object managedChannel, + Supplier>> stubFactory, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager) { + Metadata metadata = new Metadata(); + String authorityOverride = null; + for (Map.Entry entry : headers.entrySet()) { + String name = entry.getKey(); + String value = entry.getValue(); + if (name.equals("host")) { + authorityOverride = value; + continue; + } + metadata.put(Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER), value); + } + + Channel channel = + ClientInterceptors.intercept( + (Channel) managedChannel, MetadataUtils.newAttachHeadersInterceptor(metadata)); + + Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; + MarshalerServiceStub stub = + stubFactory + .get() + .apply(channel, authorityOverride) + .withCompression(codec.getMessageEncoding()); + + return new UpstreamGrpcSender<>(stub, timeoutNanos); + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/package-info.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/package-info.java similarity index 66% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/package-info.java rename to exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/package-info.java index fa262be08bb..42050312d36 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/package-info.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/package-info.java @@ -3,8 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -/** Utilities for HTTP exporters. */ @ParametersAreNonnullByDefault -package io.opentelemetry.exporter.internal.okhttp; +package io.opentelemetry.exporter.sender.grpc.managedchannel.internal; import javax.annotation.ParametersAreNonnullByDefault; diff --git a/exporters/sender/grpc-managed-channel/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider b/exporters/sender/grpc-managed-channel/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider new file mode 100644 index 00000000000..fabb70d11a9 --- /dev/null +++ b/exporters/sender/grpc-managed-channel/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider @@ -0,0 +1 @@ +io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSenderProvider diff --git a/exporters/sender/okhttp/build.gradle.kts b/exporters/sender/okhttp/build.gradle.kts index c8908b9e3b2..ef2c6c59c5f 100644 --- a/exporters/sender/okhttp/build.gradle.kts +++ b/exporters/sender/okhttp/build.gradle.kts @@ -14,5 +14,7 @@ dependencies { implementation("com.squareup.okhttp3:okhttp") + compileOnly("io.grpc:grpc-stub") + testImplementation("com.linecorp.armeria:armeria-junit5") } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcRequestBody.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java similarity index 97% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcRequestBody.java rename to exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java index 0fb28914a80..ca9191360db 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcRequestBody.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.internal.grpc; +package io.opentelemetry.exporter.sender.okhttp.internal; import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.io.IOException; diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/OkHttpGrpcExporter.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java similarity index 56% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/OkHttpGrpcExporter.java rename to exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 2178f47fc51..5da750edead 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/OkHttpGrpcExporter.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -21,104 +21,108 @@ * limitations under the License. */ -package io.opentelemetry.exporter.internal.grpc; +package io.opentelemetry.exporter.sender.okhttp.internal; -import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.ExporterMetrics; +import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; +import io.opentelemetry.exporter.internal.grpc.GrpcResponse; +import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.exporter.internal.retry.RetryUtil; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; +import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.function.BiConsumer; import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; import okhttp3.Call; import okhttp3.Callback; import okhttp3.Headers; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; +import okhttp3.Protocol; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; /** - * A {@link GrpcExporter} which uses OkHttp instead of grpc-java. + * A {@link GrpcSender} which uses OkHttp instead of grpc-java. * *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class OkHttpGrpcExporter implements GrpcExporter { +public final class OkHttpGrpcSender implements GrpcSender { private static final String GRPC_STATUS = "grpc-status"; private static final String GRPC_MESSAGE = "grpc-message"; - private static final Logger internalLogger = Logger.getLogger(OkHttpGrpcExporter.class.getName()); - - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - - // We only log unimplemented once since it's a configuration issue that won't be recovered. - private final AtomicBoolean loggedUnimplemented = new AtomicBoolean(); - private final AtomicBoolean isShutdown = new AtomicBoolean(); - - private final String type; - private final ExporterMetrics exporterMetrics; private final OkHttpClient client; private final HttpUrl url; private final Headers headers; private final boolean compressionEnabled; - /** Creates a new {@link OkHttpGrpcExporter}. */ - OkHttpGrpcExporter( - String exporterName, - String type, - OkHttpClient client, - Supplier meterProviderSupplier, + /** Creates a new {@link OkHttpGrpcSender}. */ + public OkHttpGrpcSender( String endpoint, - Headers headers, - boolean compressionEnabled) { - this.type = type; - this.exporterMetrics = - ExporterMetrics.createGrpcOkHttp(exporterName, type, meterProviderSupplier); - this.client = client; + boolean compressionEnabled, + long timeoutNanos, + Map headers, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager) { + OkHttpClient.Builder clientBuilder = + new OkHttpClient.Builder() + .dispatcher(OkHttpUtil.newDispatcher()) + .callTimeout(Duration.ofNanos(timeoutNanos)); + if (retryPolicy != null) { + clientBuilder.addInterceptor( + new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable)); + } + if (sslContext != null && trustManager != null) { + clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); + } + if (endpoint.startsWith("http://")) { + clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); + } else { + clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); + } + this.client = clientBuilder.build(); + + Headers.Builder headersBuilder = new Headers.Builder(); + headers.forEach(headersBuilder::add); + headersBuilder.add("te", "trailers"); + if (compressionEnabled) { + headersBuilder.add("grpc-encoding", "gzip"); + } + this.headers = headersBuilder.build(); this.url = HttpUrl.get(endpoint); - this.headers = headers; this.compressionEnabled = compressionEnabled; } @Override - public CompletableResultCode export(T exportRequest, int numItems) { - if (isShutdown.get()) { - return CompletableResultCode.ofFailure(); - } - - exporterMetrics.addSeen(numItems); - + public void send(T request, Runnable onSuccess, BiConsumer onError) { Request.Builder requestBuilder = new Request.Builder().url(url).headers(headers); - RequestBody requestBody = new GrpcRequestBody(exportRequest, compressionEnabled); + RequestBody requestBody = new GrpcRequestBody(request, compressionEnabled); requestBuilder.post(requestBody); - CompletableResultCode result = new CompletableResultCode(); - client .newCall(requestBuilder.build()) .enqueue( new Callback() { @Override public void onFailure(Call call, IOException e) { - exporterMetrics.addFailed(numItems); - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. The request could not be executed. Full error message: " - + e.getMessage()); - result.fail(); + String description = e.getMessage(); + if (description == null) { + description = ""; + } + onError.accept(GrpcResponse.create(2 /* UNKNOWN */, description), e); } @Override @@ -127,58 +131,32 @@ public void onResponse(Call call, Response response) { try { response.body().bytes(); } catch (IOException e) { - logger.log( - Level.WARNING, - "Failed to export " + type + "s, could not consume server response.", + onError.accept( + GrpcResponse.create( + GrpcExporterUtil.GRPC_STATUS_UNKNOWN, + "Could not consume server response."), e); - exporterMetrics.addFailed(numItems); - result.fail(); return; } String status = grpcStatus(response); if ("0".equals(status)) { - exporterMetrics.addSuccess(numItems); - result.succeed(); + onSuccess.run(); return; } - exporterMetrics.addFailed(numItems); - - String codeMessage = - status != null - ? "gRPC status code " + status - : "HTTP status code " + response.code(); String errorMessage = grpcMessage(response); - - if (GrpcStatusUtil.GRPC_STATUS_UNIMPLEMENTED.equals(status)) { - if (loggedUnimplemented.compareAndSet(false, true)) { - GrpcExporterUtil.logUnimplemented(internalLogger, type, errorMessage); - } - } else if (GrpcStatusUtil.GRPC_STATUS_UNAVAILABLE.equals(status)) { - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. Server is UNAVAILABLE. " - + "Make sure your collector is running and reachable from this network. " - + "Full error message:" - + errorMessage); - } else { - logger.log( - Level.WARNING, - "Failed to export " - + type - + "s. Server responded with " - + codeMessage - + ". Error message: " - + errorMessage); + int statusCode; + try { + statusCode = Integer.parseInt(status); + } catch (NumberFormatException ex) { + statusCode = GrpcExporterUtil.GRPC_STATUS_UNKNOWN; } - result.fail(); + onError.accept( + GrpcResponse.create(statusCode, errorMessage), + new IllegalStateException(errorMessage)); } }); - - return result; } @Nullable @@ -214,10 +192,6 @@ private static String grpcMessage(Response response) { @Override public CompletableResultCode shutdown() { - if (!isShutdown.compareAndSet(false, true)) { - logger.log(Level.INFO, "Calling shutdown() multiple times."); - return CompletableResultCode.ofSuccess(); - } client.dispatcher().cancelAll(); client.dispatcher().executorService().shutdownNow(); client.connectionPool().evictAll(); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java new file mode 100644 index 00000000000..c1cbb3be664 --- /dev/null +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import io.grpc.Channel; +import io.opentelemetry.exporter.internal.grpc.GrpcSender; +import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; +import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.net.URI; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; +import org.jetbrains.annotations.Nullable; + +/** + * {@link GrpcSender} SPI implementation for {@link OkHttpGrpcSender}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class OkHttpGrpcSenderProvider implements GrpcSenderProvider { + + @Override + public GrpcSender createSender( + URI endpoint, + String endpointPath, + boolean compressionEnabled, + long timeoutNanos, + Map headers, + @Nullable Object managedChannel, + Supplier>> stubFactory, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager) { + return new OkHttpGrpcSender<>( + endpoint.resolve(endpointPath).toString(), + compressionEnabled, + timeoutNanos, + headers, + retryPolicy, + sslContext, + trustManager); + } +} diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 7c2089202f0..306c7b97163 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -5,11 +5,9 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.http.HttpSender; -import io.opentelemetry.exporter.internal.okhttp.OkHttpUtil; -import io.opentelemetry.exporter.internal.retry.RetryInterceptor; -import io.opentelemetry.exporter.internal.retry.RetryUtil; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpUtil.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java similarity index 94% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpUtil.java rename to exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java index 419acf2df1d..8aef6b7c21a 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpUtil.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.internal.okhttp; +package io.opentelemetry.exporter.sender.okhttp.internal; import io.opentelemetry.sdk.internal.DaemonThreadFactory; import java.util.concurrent.SynchronousQueue; diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryInterceptor.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java similarity index 98% rename from exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryInterceptor.java rename to exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java index fc8f64942f0..ee7d5fc9177 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryInterceptor.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.internal.retry; +package io.opentelemetry.exporter.sender.okhttp.internal; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/package-info.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/package-info.java index 520a8f152f2..b623f8855dc 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/package-info.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/package-info.java @@ -3,7 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -/** Utilities for HTTP exporters. */ @ParametersAreNonnullByDefault package io.opentelemetry.exporter.sender.okhttp.internal; diff --git a/exporters/sender/okhttp/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider b/exporters/sender/okhttp/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider new file mode 100644 index 00000000000..0d308266d75 --- /dev/null +++ b/exporters/sender/okhttp/src/main/resources/META-INF/services/io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider @@ -0,0 +1 @@ +io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryInterceptorTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java similarity index 99% rename from exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryInterceptorTest.java rename to exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java index 4aa630508c2..f4b644a36f3 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryInterceptorTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.internal.retry; +package io.opentelemetry.exporter.sender.okhttp.internal; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java index ae3588c776c..099db317838 100644 --- a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java @@ -72,7 +72,9 @@ void configureOtlpTimeout() { .isInstanceOfSatisfying( OtlpGrpcSpanExporter.class, otlp -> - assertThat(otlp).extracting("delegate.client.callTimeoutMillis").isEqualTo(10)); + assertThat(otlp) + .extracting("delegate.grpcSender.client.callTimeoutMillis") + .isEqualTo(10)); } } } diff --git a/sdk-extensions/jaeger-remote-sampler/build.gradle.kts b/sdk-extensions/jaeger-remote-sampler/build.gradle.kts index 90334c112b5..fca0210b6a1 100644 --- a/sdk-extensions/jaeger-remote-sampler/build.gradle.kts +++ b/sdk-extensions/jaeger-remote-sampler/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { implementation(project(":sdk:all")) implementation(project(":exporters:common")) + implementation(project(":exporters:sender:okhttp")) implementation("com.squareup.okhttp3:okhttp") diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java index c2327b6067e..4cb4f6f4970 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/JaegerRemoteSamplerBuilder.java @@ -11,7 +11,7 @@ import io.opentelemetry.api.internal.Utils; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; -import io.opentelemetry.exporter.internal.okhttp.OkHttpUtil; +import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpUtil; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.net.URI; import java.time.Duration; diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java index 9b32aeefb26..02cc43869cb 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java @@ -5,9 +5,9 @@ package io.opentelemetry.sdk.extension.trace.jaeger.sampler; -import io.opentelemetry.exporter.internal.grpc.GrpcRequestBody; -import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil; -import io.opentelemetry.exporter.internal.retry.RetryUtil; +import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; +import io.opentelemetry.exporter.sender.okhttp.internal.GrpcRequestBody; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -93,7 +93,7 @@ public SamplingStrategyResponseUnMarshaler execute( status != null ? "gRPC status code " + status : "HTTP status code " + response.code(); String errorMessage = grpcMessage(response); - if (GrpcStatusUtil.GRPC_STATUS_UNIMPLEMENTED.equals(status)) { + if (String.valueOf(GrpcExporterUtil.GRPC_STATUS_UNIMPLEMENTED).equals(status)) { logger.log( Level.SEVERE, "Failed to execute " @@ -101,7 +101,7 @@ public SamplingStrategyResponseUnMarshaler execute( + "s. Server responded with UNIMPLEMENTED. " + "Full error message: " + errorMessage); - } else if (GrpcStatusUtil.GRPC_STATUS_UNAVAILABLE.equals(status)) { + } else if (String.valueOf(GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE).equals(status)) { logger.log( Level.SEVERE, "Failed to execute " diff --git a/settings.gradle.kts b/settings.gradle.kts index 00e6ed87454..edd30452d53 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -34,6 +34,7 @@ include(":extensions:incubator") include(":extensions:kotlin") include(":extensions:trace-propagators") include(":exporters:common") +include(":exporters:sender:grpc-managed-channel") include(":exporters:sender:jdk") include(":exporters:sender:okhttp") include(":exporters:jaeger")