From 8f8bcb374ffab7a885d74c7e6d003ecdad514bf9 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 27 Aug 2024 18:38:28 +0000 Subject: [PATCH 1/8] Add opt-in flag and ClientInterceptor to propagate trace context for Spanner server side tracing --- .../clirr-ignored-differences.xml | 7 + .../google/cloud/spanner/SpannerOptions.java | 36 ++++ .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 7 + .../spi/v1/SpannerInterceptorProvider.java | 8 + .../spi/v1/SpannerMetadataProvider.java | 7 + .../spi/v1/TraceContextInterceptor.java | 73 ++++++++ .../cloud/spanner/SpannerOptionsHelper.java | 29 +++ .../cloud/spanner/SpannerOptionsTest.java | 18 ++ .../spanner/spi/v1/GapicSpannerRpcTest.java | 170 ++++++++++++++++++ .../spi/v1/SpannerMetadataProviderTest.java | 11 ++ 10 files changed, 366 insertions(+) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsHelper.java diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 9433fcba5ad..863afe6a3d8 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -695,6 +695,13 @@ boolean isEnableApiTracing() + + + 7012 + com/google/cloud/spanner/SpannerOptions$SpannerEnvironment + boolean isEnableServerSideTracing() + + 7012 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 3a8632e2ebe..aa9347f59bb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -158,6 +158,7 @@ public class SpannerOptions extends ServiceOptions { private final OpenTelemetry openTelemetry; private final boolean enableApiTracing; private final boolean enableExtendedTracing; + private final boolean enableServerSideTracing; enum TracingFramework { OPEN_CENSUS, @@ -664,6 +665,7 @@ protected SpannerOptions(Builder builder) { openTelemetry = builder.openTelemetry; enableApiTracing = builder.enableApiTracing; enableExtendedTracing = builder.enableExtendedTracing; + enableServerSideTracing = builder.enableServerSideTracing; } /** @@ -696,6 +698,10 @@ default boolean isEnableExtendedTracing() { default boolean isEnableApiTracing() { return false; } + + default boolean isEnableServerSideTracing() { + return false; + } } /** @@ -709,6 +715,8 @@ private static class SpannerEnvironmentImpl implements SpannerEnvironment { "SPANNER_OPTIMIZER_STATISTICS_PACKAGE"; private static final String SPANNER_ENABLE_EXTENDED_TRACING = "SPANNER_ENABLE_EXTENDED_TRACING"; private static final String SPANNER_ENABLE_API_TRACING = "SPANNER_ENABLE_API_TRACING"; + private static final String SPANNER_ENABLE_SERVER_SIDE_TRACING = + "SPANNER_ENABLE_SERVER_SIDE_TRACING"; private SpannerEnvironmentImpl() {} @@ -734,6 +742,11 @@ public boolean isEnableExtendedTracing() { public boolean isEnableApiTracing() { return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_API_TRACING)); } + + @Override + public boolean isEnableServerSideTracing() { + return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_SERVER_SIDE_TRACING)); + } } /** Builder for {@link SpannerOptions} instances. */ @@ -797,6 +810,8 @@ public static class Builder private OpenTelemetry openTelemetry; private boolean enableApiTracing = SpannerOptions.environment.isEnableApiTracing(); private boolean enableExtendedTracing = SpannerOptions.environment.isEnableExtendedTracing(); + private boolean enableServerSideTracing = + SpannerOptions.environment.isEnableServerSideTracing(); private static String createCustomClientLibToken(String token) { return token + " " + ServiceOptions.getGoogApiClientLibName(); @@ -862,6 +877,7 @@ protected Builder() { this.useVirtualThreads = options.useVirtualThreads; this.enableApiTracing = options.enableApiTracing; this.enableExtendedTracing = options.enableExtendedTracing; + this.enableServerSideTracing = options.enableServerSideTracing; } @Override @@ -1389,6 +1405,17 @@ public Builder setEnableExtendedTracing(boolean enableExtendedTracing) { return this; } + /** + * Sets whether to enable Spanner server side tracing. Enabling this option will create the + * trace spans at the Spanner layer. By default, server side tracing is disabled. Enabling + * server side tracing requires OpenTelemetry to be set up properly. Simply enabling this option + * won't generate server side traces. + */ + public Builder setEnableServerSideTracing(boolean enableServerSideTracing) { + this.enableServerSideTracing = enableServerSideTracing; + return this; + } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { @@ -1478,6 +1505,7 @@ public static void enableOpenCensusTraces() { */ @ObsoleteApi( "The OpenCensus project is deprecated. Use enableOpenTelemetryTraces to switch to OpenTelemetry traces") + @VisibleForTesting static void resetActiveTracingFramework() { activeTracingFramework = null; } @@ -1679,6 +1707,14 @@ public boolean isEnableExtendedTracing() { return enableExtendedTracing; } + /** + * Returns whether Spanner server side tracing is enabled. If this option is enabled then trace + * spans will be created at the Spanner layer. + */ + public boolean isServerSideTracingEnabled() { + return enableServerSideTracing; + } + /** Returns the default query options to use for the specific database. */ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) { // Use the specific query options for the database if any have been specified. These have diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index e1e15b851b4..1b3183ead90 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -274,6 +274,7 @@ public class GapicSpannerRpc implements SpannerRpc { private final boolean leaderAwareRoutingEnabled; private final int numChannels; private final boolean isGrpcGcpExtensionEnabled; + private final boolean serverSideTracingEnabled; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -327,6 +328,7 @@ public GapicSpannerRpc(final SpannerOptions options) { this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); this.numChannels = options.getNumChannels(); this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled(); + this.serverSideTracingEnabled = options.isServerSideTracingEnabled(); if (initializeStubs) { // First check if SpannerOptions provides a TransportChannelProvider. Create one @@ -350,6 +352,8 @@ public GapicSpannerRpc(final SpannerOptions options) { MoreObjects.firstNonNull( options.getInterceptorProvider(), SpannerInterceptorProvider.createDefault(options.getOpenTelemetry()))) + // This sets the trace context headers. + .withTraceContext(serverSideTracingEnabled, options.getOpenTelemetry()) // This sets the response compressor (Server -> Client). .withEncoding(compressorName)) .setHeaderProvider(headerProviderWithUserAgent) @@ -1992,6 +1996,9 @@ GrpcCallContext newCallContext( if (routeToLeader && leaderAwareRoutingEnabled) { context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader()); } + if (serverSideTracingEnabled) { + context = context.withExtraHeaders(metadataProvider.newServerSideTracingHeader()); + } if (callCredentialsProvider != null) { CallCredentials callCredentials = callCredentialsProvider.getCallCredentials(); if (callCredentials != null) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java index 9b1a2fd3c1f..bc20e815e95 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java @@ -74,6 +74,14 @@ SpannerInterceptorProvider withEncoding(String encoding) { return this; } + SpannerInterceptorProvider withTraceContext( + boolean serverSideTracingEnabled, OpenTelemetry openTelemetry) { + if (serverSideTracingEnabled) { + return with(new TraceContextInterceptor(openTelemetry)); + } + return this; + } + @Override public List getInterceptors() { return clientInterceptors; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java index 0b8d76d52df..611383942ca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java @@ -37,6 +37,7 @@ class SpannerMetadataProvider { private final Map, String> headers; private final String resourceHeaderKey; private static final String ROUTE_TO_LEADER_HEADER_KEY = "x-goog-spanner-route-to-leader"; + private static final String SERVER_SIDE_TRACING_HEADER_KEY = "x-goog-spanner-end-to-end-tracing"; private static final Pattern[] RESOURCE_TOKEN_PATTERNS = { Pattern.compile("^(?projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"), Pattern.compile("^(?projects/[^/]*/instances/[^/]*)(.*)?") @@ -44,6 +45,8 @@ class SpannerMetadataProvider { private static final Map> ROUTE_TO_LEADER_HEADER_MAP = ImmutableMap.of(ROUTE_TO_LEADER_HEADER_KEY, Collections.singletonList("true")); + private static final Map> SERVER_SIDE_TRACING_HEADER_MAP = + ImmutableMap.of(SERVER_SIDE_TRACING_HEADER_KEY, Collections.singletonList("true")); private SpannerMetadataProvider(Map headers, String resourceHeaderKey) { this.resourceHeaderKey = resourceHeaderKey; @@ -89,6 +92,10 @@ Map> newRouteToLeaderHeader() { return ROUTE_TO_LEADER_HEADER_MAP; } + Map> newServerSideTracingHeader() { + return SERVER_SIDE_TRACING_HEADER_MAP; + } + private Map, String> constructHeadersAsMetadata( Map headers) { ImmutableMap.Builder, String> headersAsMetadataBuilder = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java new file mode 100644 index 00000000000..4280e310355 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; + +/** + * Intercepts all gRPC calls and injects trace context related headers to propagate trace context to + * Spanner. This class takes reference from OpenTelemetry's JAVA instrumentation library for gRPC. + * https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/9ecf7965aa455d41ea8cc0761b6c6b6eeb106324/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java#L27 + */ +public class TraceContextInterceptor implements ClientInterceptor { + + private final TextMapPropagator textMapPropagator; + + public TraceContextInterceptor(OpenTelemetry openTelemetry) { + this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator(); + } + + enum MetadataSetter implements TextMapSetter { + INSTANCE; + + @SuppressWarnings("null") + @Override + public void set(Metadata carrier, String key, String value) { + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + } + + private static final class NoopSimpleForwardingClientCallListener + extends SimpleForwardingClientCallListener { + public NoopSimpleForwardingClientCallListener(ClientCall.Listener responseListener) { + super(responseListener); + } + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + Context parentContext = Context.current(); + textMapPropagator.inject(parentContext, headers, MetadataSetter.INSTANCE); + super.start(new NoopSimpleForwardingClientCallListener(responseListener), headers); + } + }; + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsHelper.java new file mode 100644 index 00000000000..db02c625099 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsHelper.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +/** Helper to configure SpannerOptions for tests. */ +public class SpannerOptionsHelper { + + /** + * Resets the activeTracingFramework. This variable is used for internal testing, and is not a + * valid production scenario. + */ + public static void resetActiveTracingFramework() { + SpannerOptions.resetActiveTracingFramework(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 73455f06688..7f893b0499a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -736,6 +736,24 @@ public void testLeaderAwareRoutingEnablement() { .isLeaderAwareRoutingEnabled()); } + @Test + public void testServerSideTracingEnablement() { + // Test that end to end tracing is disabled by default. + assertFalse(SpannerOptions.newBuilder().setProjectId("p").build().isServerSideTracingEnabled()); + assertTrue( + SpannerOptions.newBuilder() + .setProjectId("p") + .setEnableServerSideTracing(true) + .build() + .isServerSideTracingEnabled()); + assertFalse( + SpannerOptions.newBuilder() + .setProjectId("p") + .setEnableServerSideTracing(false) + .build() + .isServerSideTracingEnabled()); + } + @Test public void testSetDirectedReadOptions() { final DirectedReadOptions directedReadOptions = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 42a07ed9ea6..6b91f1f6cd5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -47,6 +47,7 @@ import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; +import com.google.cloud.spanner.SpannerOptionsHelper; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.spi.v1.GapicSpannerRpc.AdminRequestsLimitExceededRetryAlgorithm; @@ -76,6 +77,12 @@ import io.grpc.auth.MoreCallCredentials; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.lite.ProtoLiteUtils; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.samplers.Sampler; import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; @@ -148,6 +155,8 @@ public class GapicSpannerRpcTest { private static String defaultUserAgent; private static Spanner spanner; private static boolean isRouteToLeader; + private static boolean isServerSideTracing; + private static boolean isTraceContextPresent; @Parameter public Dialect dialect; @@ -158,6 +167,10 @@ public static Object[] data() { @Before public void startServer() throws IOException { + // Enable OpenTelemetry tracing. + SpannerOptionsHelper.resetActiveTracingFramework(); + SpannerOptions.enableOpenTelemetryTraces(); + assumeTrue( "Skip tests when emulator is enabled as this test interferes with the check whether the emulator is running", System.getenv("SPANNER_EMULATOR_HOST") == null); @@ -194,13 +207,24 @@ public ServerCall.Listener interceptCall( if (call.getMethodDescriptor() .equals(SpannerGrpc.getExecuteStreamingSqlMethod()) || call.getMethodDescriptor().equals(SpannerGrpc.getExecuteSqlMethod())) { + String traceParentHeader = + headers.get(Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER)); + isTraceContextPresent = (traceParentHeader != null); String routeToLeaderHeader = headers.get( Key.of( "x-goog-spanner-route-to-leader", Metadata.ASCII_STRING_MARSHALLER)); + String serverSideTracingHeader = + headers.get( + Key.of( + "x-goog-spanner-end-to-end-tracing", + Metadata.ASCII_STRING_MARSHALLER)); isRouteToLeader = (routeToLeaderHeader != null && routeToLeaderHeader.equals("true")); + isServerSideTracing = + (serverSideTracingHeader != null + && serverSideTracingHeader.equals("true")); } return Contexts.interceptCall(Context.current(), call, headers, next); } @@ -224,6 +248,8 @@ public void reset() throws InterruptedException { server.awaitTermination(); } isRouteToLeader = false; + isServerSideTracing = false; + isTraceContextPresent = false; } @Test @@ -464,6 +490,83 @@ public void testNewCallContextWithRouteToLeaderHeaderAndLarDisabled() { rpc.shutdown(); } + @Test + public void testNewCallContextWithServerSideTracingHeader() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .setEnableServerSideTracing(true) + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod()); + assertNotNull(callContext); + assertEquals( + ImmutableList.of("true"), + callContext.getExtraHeaders().get("x-goog-spanner-end-to-end-tracing")); + assertEquals( + ImmutableList.of("projects/some-project"), + callContext.getExtraHeaders().get(ApiClientHeaderProvider.getDefaultResourceHeaderKey())); + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithoutServerSideTracingHeader() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .setEnableServerSideTracing(false) + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod()); + assertNotNull(callContext); + assertNull(callContext.getExtraHeaders().get("x-goog-spanner-end-to-end-tracing")); + rpc.shutdown(); + } + + @Test + public void testServerSideTracingHeaderWithEnabledTracing() { + final SpannerOptions options = + createSpannerOptions().toBuilder().setEnableServerSideTracing(true).build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + TransactionRunner runner = databaseClient.readWriteTransaction(); + runner.run( + transaction -> { + transaction.executeUpdate(UPDATE_FOO_STATEMENT); + return null; + }); + } + assertTrue(isServerSideTracing); + } + + @Test + public void testServerSideTracingHeaderWithDisabledTracing() { + final SpannerOptions options = + createSpannerOptions().toBuilder().setEnableServerSideTracing(false).build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + TransactionRunner runner = databaseClient.readWriteTransaction(); + runner.run( + transaction -> { + transaction.executeUpdate(UPDATE_FOO_STATEMENT); + return null; + }); + } + assertFalse(isServerSideTracing); + } + @Test public void testAdminRequestsLimitExceededRetryAlgorithm() { AdminRequestsLimitExceededRetryAlgorithm alg = @@ -535,6 +638,73 @@ public void testCustomUserAgent() { } } + @Test + public void testTraceContextHeaderWithOpenTelemetryAndServerSideTracingEnabled() { + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(SdkTracerProvider.builder().setSampler(Sampler.alwaysOn()).build()) + .build(); + + final SpannerOptions options = + createSpannerOptions() + .toBuilder() + .setOpenTelemetry(openTelemetry) + .setEnableServerSideTracing(true) + .build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + try (final ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + assertTrue(isTraceContextPresent); + } + } + + @Test + public void testTraceContextHeaderWithOpenTelemetryAndServerSideTracingDisabled() { + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(SdkTracerProvider.builder().setSampler(Sampler.alwaysOn()).build()) + .build(); + + final SpannerOptions options = + createSpannerOptions() + .toBuilder() + .setOpenTelemetry(openTelemetry) + .setEnableServerSideTracing(false) + .build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + try (final ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + assertFalse(isTraceContextPresent); + } + } + + @Test + public void testTraceContextHeaderWithoutOpenTelemetry() { + final SpannerOptions options = createSpannerOptions(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + try (final ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + assertFalse(isTraceContextPresent); + } + } + @Test public void testRouteToLeaderHeaderForReadOnly() { final SpannerOptions options = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java index cc43e2dc334..010f16bc4cc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java @@ -94,6 +94,17 @@ public void testNewRouteToLeaderHeader() { assertTrue(Maps.difference(extraHeaders, expectedHeaders).areEqual()); } + @Test + public void testNewEndToEndTracingHeader() { + SpannerMetadataProvider metadataProvider = + SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); + Map> extraHeaders = metadataProvider.newServerSideTracingHeader(); + Map> expectedHeaders = + ImmutableMap.>of( + "x-goog-spanner-end-to-end-tracing", ImmutableList.of("true")); + assertTrue(Maps.difference(extraHeaders, expectedHeaders).areEqual()); + } + private String getResourceHeaderValue( SpannerMetadataProvider headerProvider, String resourceTokenTemplate) { Metadata metadata = headerProvider.newMetadata(resourceTokenTemplate, "projects/p"); From 88f1e1e41dd978d0ff15b3c07ee2c971535bf58d Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 27 Aug 2024 18:39:54 +0000 Subject: [PATCH 2/8] Add changes in Spanner executor for testing end to end tracing --- google-cloud-spanner-executor/pom.xml | 30 ++++ .../executor/spanner/CloudClientExecutor.java | 166 +++++++++++++++--- .../executor/spanner/CloudExecutorImpl.java | 42 +++++ .../cloud/executor/spanner/CloudUtil.java | 2 + .../cloud/executor/spanner/WorkerProxy.java | 57 +++++- 5 files changed, 268 insertions(+), 29 deletions(-) diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index d0f83dfce9e..5019f307908 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -21,11 +21,41 @@ UTF-8 + + + + io.opentelemetry + opentelemetry-bom + 1.41.0 + pom + import + + + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + com.google.cloud.opentelemetry + exporter-trace + 0.29.0 + com.google.cloud google-cloud-spanner + + com.google.cloud + google-cloud-trace + 2.47.0 + io.grpc grpc-api diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index 443a8faf238..3e340965193 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; import com.google.api.gax.retrying.RetrySettings; @@ -70,15 +71,21 @@ import com.google.cloud.spanner.TimestampBound; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Value; import com.google.cloud.spanner.encryption.CustomerManagedEncryption; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; +import com.google.cloud.trace.v1.TraceServiceClient; +import com.google.cloud.trace.v1.TraceServiceSettings; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.cloudtrace.v1.GetTraceRequest; +import com.google.devtools.cloudtrace.v1.Trace; +import com.google.devtools.cloudtrace.v1.TraceSpan; import com.google.longrunning.Operation; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; @@ -152,6 +159,9 @@ import com.google.spanner.v1.TypeCode; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -332,24 +342,28 @@ public void startRWTransaction() throws Exception { // Try to commit return null; }; + io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current(); Runnable runnable = - () -> { - try { - runner = - optimistic - ? dbClient.readWriteTransaction(Options.optimisticLock()) - : dbClient.readWriteTransaction(); - LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); - runner.run(callable); - transactionSucceeded(runner.getCommitTimestamp().toProto()); - } catch (SpannerException e) { - LOGGER.log( - Level.WARNING, - String.format("Transaction runnable failed with exception %s\n", e.getMessage()), - e); - transactionFailed(e); - } - }; + context.wrap( + () -> { + try { + runner = + optimistic + ? dbClient.readWriteTransaction(Options.optimisticLock()) + : dbClient.readWriteTransaction(); + LOGGER.log( + Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); + runner.run(callable); + transactionSucceeded(runner.getCommitTimestamp().toProto()); + } catch (SpannerException e) { + LOGGER.log( + Level.WARNING, + String.format( + "Transaction runnable failed with exception %s\n", e.getMessage()), + e); + transactionFailed(e); + } + }); LOGGER.log( Level.INFO, String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed)); @@ -753,6 +767,11 @@ public synchronized void closeBatchTxn() throws SpannerException { Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("action-pool-%d").build()); + // Thread pool to verify end to end traces. + private static final Executor serverSideTracesThreadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("server-side-traces-pool-%d").build()); + private synchronized Spanner getClientWithTimeout( long timeoutSeconds, boolean useMultiplexedSession) throws IOException { if (clientWithTimeout != null) { @@ -815,6 +834,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex .setHost(HOST_PREFIX + WorkerProxy.spannerPort) .setCredentials(credentials) .setChannelProvider(channelProvider) + .setEnableServerSideTracing(true) + .setOpenTelemetry(WorkerProxy.openTelemetrySdk) .setSessionPoolOption(sessionPoolOptions); SpannerStubSettings.Builder stubSettingsBuilder = @@ -838,6 +859,85 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex return optionsBuilder.build().getService(); } + private TraceServiceClient traceServiceClient; + + // Return the trace service client, create one if not exists. + private synchronized TraceServiceClient getTraceServiceClient() throws IOException { + if (traceServiceClient != null) { + return traceServiceClient; + } + // Create a trace service client + Credentials credentials; + if (WorkerProxy.serviceKeyFile.isEmpty()) { + credentials = NoCredentials.getInstance(); + } else { + credentials = + GoogleCredentials.fromStream( + new ByteArrayInputStream( + FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))), + HTTP_TRANSPORT_FACTORY); + } + + TraceServiceSettings traceServiceSettings = + TraceServiceSettings.newBuilder() + .setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .build(); + + traceServiceClient = TraceServiceClient.create(traceServiceSettings); + return traceServiceClient; + } + + private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; + private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; + + /* Handles verification of end to end traces */ + public Status startVerificationOfEndToEndTrace( + String traceId, ExecutionFlowContext executionContext) { + serverSideTracesThreadPool.execute( + () -> { + boolean isValidTrace = isExportedEndToEndTraceValid(traceId); + if (!isValidTrace) { + LOGGER.log(Level.WARNING, String.format("traceId:%s failed to be verified.", traceId)); + executionContext.onError( + Status.INTERNAL + .withDescription( + String.format( + "failed to verify end to end trace for trace_id: %s", traceId)) + .getCause()); + } + }); + return Status.OK; + } + + /* Returns whether a exported trace is valid. */ + public boolean isExportedEndToEndTraceValid(String traceId) { + try { + GetTraceRequest getTraceRequest = + GetTraceRequest.newBuilder() + .setProjectId(WorkerProxy.PROJECT_ID) + .setTraceId(traceId) + .build(); + Trace trace = getTraceServiceClient().getTrace(getTraceRequest); + boolean readWriteOrReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false; + for (TraceSpan span : trace.getSpansList()) { + if (span.getName() == READ_ONLY_TRANSACTION || span.getName() == READ_WRITE_TRANSACTION) { + readWriteOrReadOnlyTxnPresent = true; + } + if (span.getName().startsWith("Spanner.")) { + spannerServerSideSpanPresent = true; + } + } + if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent) { + return false; + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "failed to verify end to end traces.", e); + return false; + } + return true; + } + /** Handle actions. */ public Status startHandlingRequest( SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) { @@ -862,17 +962,20 @@ public Status startHandlingRequest( useMultiplexedSession = false; } + io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current(); actionThreadPool.execute( - () -> { - Status status = - executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext); - if (!status.isOk()) { - LOGGER.log( - Level.WARNING, - String.format("Failed to execute action with error: %s\n%s", status, action)); - executionContext.onError(status.getCause()); - } - }); + context.wrap( + () -> { + Status status = + executeAction( + outcomeSender, action, dbPath, useMultiplexedSession, executionContext); + if (!status.isOk()) { + LOGGER.log( + Level.WARNING, + String.format("Failed to execute action with error: %s\n%s", status, action)); + executionContext.onError(status.getCause()); + } + })); return Status.OK; } @@ -883,7 +986,10 @@ private Status executeAction( String dbPath, boolean useMultiplexedSession, ExecutionFlowContext executionContext) { - + Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName()); + String actionType = action.getActionCase().toString(); + Span span = tracer.spanBuilder(String.format("performaction_%s", actionType)).startSpan(); + Scope scope = span.makeCurrent(); try { if (action.hasAdmin()) { return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender); @@ -956,11 +1062,15 @@ private Status executeAction( ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action))); } } catch (Exception e) { + span.recordException(e); LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage()); return outcomeSender.finishWithError( toStatus( SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage()))); + } finally { + scope.close(); + span.end(); } } diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java index d2e7d9b19d1..04ef7ebdbcd 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java @@ -26,6 +26,10 @@ import com.google.spanner.executor.v1.SpannerOptions; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,16 +44,39 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP // Ratio of operations to use multiplexed sessions. private final double multiplexedSessionOperationsRatio; + // Count of checks performed to verify end to end traces using Cloud Trace APIs. + private int cloudTraceCheckCount = 0; + + // Maximum checks allowed to verify end to end traces using Cloud Trace APIs. + private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20; + public CloudExecutorImpl( boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) { clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector); this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio; } + private synchronized void incrementCloudTraceCheckCount() { + cloudTraceCheckCount++; + } + + private synchronized int getCloudTraceCheckCount() { + return cloudTraceCheckCount; + } + /** Execute SpannerAsync action requests. */ @Override public StreamObserver executeActionAsync( StreamObserver responseObserver) { + // Create a top-level OpenTelemetry span for streaming request. + Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName()); + Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan(); + Scope scope = span.makeCurrent(); + + final String traceId = span.getSpanContext().getTraceId(); + final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled(); + AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false); + CloudClientExecutor.ExecutionFlowContext executionContext = clientExecutor.new ExecutionFlowContext(responseObserver); return new StreamObserver() { @@ -86,6 +113,11 @@ public void onNext(SpannerAsyncActionRequest request) { Level.INFO, String.format("Updated request to set multiplexed session flag: \n%s", request)); } + String actionName = request.getAction().getActionCase().toString(); + if (actionName == "READ" || actionName == "QUERY") { + requestHasReadOrQueryAction.set(true); + } + Status status = clientExecutor.startHandlingRequest(request, executionContext); if (!status.isOk()) { LOGGER.log( @@ -104,9 +136,19 @@ public void onError(Throwable t) { @Override public void onCompleted() { + if (isSampled + && getCloudTraceCheckCount() < MAX_CLOUD_TRACE_CHECK_LIMIT + && requestHasReadOrQueryAction.get()) { + LOGGER.log( + Level.WARNING, String.format("traceId:%s will be verified for e2e tracing", traceId)); + incrementCloudTraceCheckCount(); + clientExecutor.startVerificationOfEndToEndTrace(traceId, executionContext); + } LOGGER.log(Level.INFO, "Client called Done, half closed"); executionContext.cleanup(); responseObserver.onCompleted(); + scope.close(); + span.end(); } }; } diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java index 30a4d98a354..17b98bbdada 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.spi.v1.TraceContextInterceptor; import com.google.common.net.HostAndPort; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; @@ -91,6 +92,7 @@ public static ManagedChannelBuilder getChannelBuilderForTestGFE( return channelBuilder .overrideAuthority(hostInCert) .sslContext(sslContext) + .intercept(new TraceContextInterceptor(WorkerProxy.openTelemetrySdk)) .negotiationType(NegotiationType.TLS); } catch (Throwable t) { throw new RuntimeException(t); diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java index 61034754f20..2146adb1d47 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java @@ -16,12 +16,29 @@ package com.google.cloud.executor.spanner; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auth.Credentials; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.protobuf.services.ProtoReflectionService; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,6 +47,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; /** * Worker proxy for Java API. This is the main entry of the Java client proxy on cloud Spanner Java @@ -55,13 +73,48 @@ public class WorkerProxy { public static double multiplexedSessionOperationsRatio = 0.0; public static boolean usePlainTextChannel = false; public static boolean enableGrpcFaultInjector = false; + public static OpenTelemetrySdk openTelemetrySdk; public static CommandLine commandLine; + public static final String PROJECT_ID = "spanner-cloud-systest"; + public static final String CLOUD_TRACE_ENDPOINT = "staging-cloudtrace.sandbox.googleapis.com:443"; + private static final int MIN_PORT = 0, MAX_PORT = 65535; - private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0; + private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0, TRACE_SAMPLING_RATE = 0.01; + + public static OpenTelemetrySdk setupOpenTelemetrySdk() throws Exception { + // Read credentials from the serviceKeyFile. + HttpTransportFactory HTTP_TRANSPORT_FACTORY = NetHttpTransport::new; + Credentials credentials = + GoogleCredentials.fromStream( + new ByteArrayInputStream(FileUtils.readFileToByteArray(new File(serviceKeyFile))), + HTTP_TRANSPORT_FACTORY); + + // OpenTelemetry configuration. + SpanExporter spanExporter = + TraceExporter.createWithConfiguration( + TraceConfiguration.builder() + .setProjectId(PROJECT_ID) + .setCredentials(credentials) + .setTraceServiceEndpoint(CLOUD_TRACE_ENDPOINT) + .build()); + return OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .setResource(Resource.getDefault()) + .setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(TRACE_SAMPLING_RATE))) + .build()) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .build(); + } public static void main(String[] args) throws Exception { + // Enable OpenTelemetry metrics and traces before injecting Opentelemetry. + SpannerOptions.enableOpenTelemetryMetrics(); + SpannerOptions.enableOpenTelemetryTraces(); + commandLine = buildOptions(args); if (!commandLine.hasOption(OPTION_SPANNER_PORT)) { @@ -117,6 +170,8 @@ public static void main(String[] args) throws Exception { + MAX_RATIO); } } + // Setup the OpenTelemetry for tracing. + openTelemetrySdk = setupOpenTelemetrySdk(); Server server; while (true) { From d263744a7e3ffafbbfac7ccd2c7062bf98e2328c Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Thu, 12 Sep 2024 07:21:51 +0000 Subject: [PATCH 3/8] fix github check failure for dependencies --- google-cloud-spanner-executor/pom.xml | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index 5019f307908..7da24a45caf 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -38,14 +38,26 @@ io.opentelemetry opentelemetry-api + + io.opentelemetry + opentelemetry-context + io.opentelemetry opentelemetry-sdk + + io.opentelemetry + opentelemetry-sdk-common + + + io.opentelemetry + opentelemetry-sdk-trace + com.google.cloud.opentelemetry exporter-trace - 0.29.0 + 0.31.0 com.google.cloud @@ -124,6 +136,11 @@ com.google.api.grpc proto-google-cloud-spanner-executor-v1 + + com.google.api.grpc + proto-google-cloud-trace-v1 + 2.48.0 + com.google.api.grpc grpc-google-cloud-spanner-executor-v1 From cdfad8c6460c41ba2e970a7b83e9cf0dbc655961 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 1 Oct 2024 05:38:17 +0000 Subject: [PATCH 4/8] minor fix --- .../cloud/executor/spanner/CloudClientExecutor.java | 8 ++++---- .../com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index bb70f7561c1..ca2066c7dbb 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -768,9 +768,9 @@ public synchronized void closeBatchTxn() throws SpannerException { new ThreadFactoryBuilder().setNameFormat("action-pool-%d").build()); // Thread pool to verify end to end traces. - private static final Executor serverSideTracesThreadPool = + private static final Executor endToEndTracesThreadPool = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("server-side-traces-pool-%d").build()); + new ThreadFactoryBuilder().setNameFormat("end-to-end-traces-pool-%d").build()); private synchronized Spanner getClientWithTimeout( long timeoutSeconds, boolean useMultiplexedSession) throws IOException { @@ -837,7 +837,7 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex .setHost(HOST_PREFIX + WorkerProxy.spannerPort) .setCredentials(credentials) .setChannelProvider(channelProvider) - .setEnableServerSideTracing(true) + .setEnableEndToEndTracing(true) .setOpenTelemetry(WorkerProxy.openTelemetrySdk) .setSessionPoolOption(sessionPoolOptions); @@ -897,7 +897,7 @@ private synchronized TraceServiceClient getTraceServiceClient() throws IOExcepti /* Handles verification of end to end traces */ public Status startVerificationOfEndToEndTrace( String traceId, ExecutionFlowContext executionContext) { - serverSideTracesThreadPool.execute( + endToEndTracesThreadPool.execute( () -> { boolean isValidTrace = isExportedEndToEndTraceValid(traceId); if (!isValidTrace) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index d86582f8630..2360b5d5173 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -275,7 +275,6 @@ public class GapicSpannerRpc implements SpannerRpc { private final boolean endToEndTracingEnabled; private final int numChannels; private final boolean isGrpcGcpExtensionEnabled; - private final boolean serverSideTracingEnabled; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -330,7 +329,6 @@ public GapicSpannerRpc(final SpannerOptions options) { this.endToEndTracingEnabled = options.isEndToEndTracingEnabled(); this.numChannels = options.getNumChannels(); this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled(); - this.serverSideTracingEnabled = options.isServerSideTracingEnabled(); if (initializeStubs) { // First check if SpannerOptions provides a TransportChannelProvider. Create one From 94bde60a472e3cb7ecc50cb37928d9290fd5bf54 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Fri, 4 Oct 2024 05:01:27 +0000 Subject: [PATCH 5/8] minor fix --- google-cloud-spanner-executor/pom.xml | 16 ++-------------- .../executor/spanner/CloudClientExecutor.java | 6 +++--- .../spanner/spi/v1/TraceContextInterceptor.java | 4 ++-- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index 93b812a9da5..57a6ed632e8 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -21,18 +21,6 @@ UTF-8 - - - - io.opentelemetry - opentelemetry-bom - 1.41.0 - pom - import - - - - io.opentelemetry @@ -66,7 +54,7 @@ com.google.cloud google-cloud-trace - 2.47.0 + 2.51.0 io.grpc @@ -139,7 +127,7 @@ com.google.api.grpc proto-google-cloud-trace-v1 - 2.48.0 + 2.51.0 com.google.api.grpc diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index ca2066c7dbb..f33bdcf7d5f 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -891,9 +891,6 @@ private synchronized TraceServiceClient getTraceServiceClient() throws IOExcepti return traceServiceClient; } - private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; - private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; - /* Handles verification of end to end traces */ public Status startVerificationOfEndToEndTrace( String traceId, ExecutionFlowContext executionContext) { @@ -913,6 +910,9 @@ public Status startVerificationOfEndToEndTrace( return Status.OK; } + private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; + private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; + /* Returns whether a exported trace is valid. */ public boolean isExportedEndToEndTraceValid(String traceId) { try { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java index 3b46ba4f880..4280e310355 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java @@ -33,11 +33,11 @@ * Spanner. This class takes reference from OpenTelemetry's JAVA instrumentation library for gRPC. * https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/9ecf7965aa455d41ea8cc0761b6c6b6eeb106324/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java#L27 */ -class TraceContextInterceptor implements ClientInterceptor { +public class TraceContextInterceptor implements ClientInterceptor { private final TextMapPropagator textMapPropagator; - TraceContextInterceptor(OpenTelemetry openTelemetry) { + public TraceContextInterceptor(OpenTelemetry openTelemetry) { this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator(); } From 23bc11f1a701327e5949ef17f0c0fbd413906bd8 Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 15 Oct 2024 22:24:07 +0000 Subject: [PATCH 6/8] fix issue related to traces being not returned from getTrace api call need to wait some time for traces to be visible. --- .../executor/spanner/CloudClientExecutor.java | 43 +++++++++++-------- .../executor/spanner/CloudExecutorImpl.java | 38 +++++++++++++--- 2 files changed, 57 insertions(+), 24 deletions(-) diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index f33bdcf7d5f..c363e35c1e1 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -176,7 +176,9 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -768,7 +770,7 @@ public synchronized void closeBatchTxn() throws SpannerException { new ThreadFactoryBuilder().setNameFormat("action-pool-%d").build()); // Thread pool to verify end to end traces. - private static final Executor endToEndTracesThreadPool = + private static final ExecutorService endToEndTracesThreadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("end-to-end-traces-pool-%d").build()); @@ -891,23 +893,27 @@ private synchronized TraceServiceClient getTraceServiceClient() throws IOExcepti return traceServiceClient; } - /* Handles verification of end to end traces */ - public Status startVerificationOfEndToEndTrace( - String traceId, ExecutionFlowContext executionContext) { - endToEndTracesThreadPool.execute( + public Future getEndToEndTraceVerificationTask(String traceId) { + return endToEndTracesThreadPool.submit( () -> { - boolean isValidTrace = isExportedEndToEndTraceValid(traceId); - if (!isValidTrace) { - LOGGER.log(Level.WARNING, String.format("traceId:%s failed to be verified.", traceId)); - executionContext.onError( - Status.INTERNAL - .withDescription( - String.format( - "failed to verify end to end trace for trace_id: %s", traceId)) - .getCause()); + try { + // Wait for 10 seconds before verifying to ensure traces are exported. + long sleepDuration = TimeUnit.SECONDS.toMillis(10); + LOGGER.log( + Level.INFO, + String.format( + "Sleeping for %d milliseconds before verifying end to end trace", + sleepDuration)); + Thread.sleep(sleepDuration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Handle interruption + LOGGER.log( + Level.INFO, + String.format("Thread interrupted.")); + return false; // Return false if interrupted } + return isExportedEndToEndTraceValid(traceId); }); - return Status.OK; } private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; @@ -924,7 +930,8 @@ public boolean isExportedEndToEndTraceValid(String traceId) { Trace trace = getTraceServiceClient().getTrace(getTraceRequest); boolean readWriteOrReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false; for (TraceSpan span : trace.getSpansList()) { - if (span.getName() == READ_ONLY_TRANSACTION || span.getName() == READ_WRITE_TRANSACTION) { + if (span.getName().contains(READ_ONLY_TRANSACTION) + || span.getName().contains(READ_WRITE_TRANSACTION)) { readWriteOrReadOnlyTxnPresent = true; } if (span.getName().startsWith("Spanner.")) { @@ -934,8 +941,8 @@ public boolean isExportedEndToEndTraceValid(String traceId) { if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent) { return false; } - } catch (IOException e) { - LOGGER.log(Level.WARNING, "failed to verify end to end traces.", e); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed to verify end to end trace.", e); return false; } return true; diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java index 04ef7ebdbcd..6fee10c95b6 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java @@ -29,6 +29,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -136,19 +137,44 @@ public void onError(Throwable t) { @Override public void onCompleted() { + // Close the scope and end the span. + scope.close(); + span.end(); if (isSampled && getCloudTraceCheckCount() < MAX_CLOUD_TRACE_CHECK_LIMIT && requestHasReadOrQueryAction.get()) { - LOGGER.log( - Level.WARNING, String.format("traceId:%s will be verified for e2e tracing", traceId)); - incrementCloudTraceCheckCount(); - clientExecutor.startVerificationOfEndToEndTrace(traceId, executionContext); + Future traceVerificationTask = + clientExecutor.getEndToEndTraceVerificationTask(traceId); + try { + LOGGER.log( + Level.INFO, + String.format("Starting end to end trace verification for trace_id:%s", traceId)); + Boolean isValidTrace = traceVerificationTask.get(); + incrementCloudTraceCheckCount(); + if (!isValidTrace) { + executionContext.onError( + Status.INTERNAL + .withDescription( + String.format( + "failed to verify end to end trace for trace_id: %s", traceId)) + .getCause()); + executionContext.cleanup(); + return; + } + } catch (Exception e) { + LOGGER.log( + Level.WARNING, + String.format( + "Failed to verify end to end trace with exception: %s\n", e.getMessage()), + e); + executionContext.onError(e); + executionContext.cleanup(); + return; + } } LOGGER.log(Level.INFO, "Client called Done, half closed"); executionContext.cleanup(); responseObserver.onCompleted(); - scope.close(); - span.end(); } }; } From 891d596a9227fa39e4d0b92b0956715f7687663b Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 15 Oct 2024 22:27:23 +0000 Subject: [PATCH 7/8] fix formatting --- .../google/cloud/executor/spanner/CloudClientExecutor.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index c363e35c1e1..714dbc309cc 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -907,9 +907,7 @@ public Future getEndToEndTraceVerificationTask(String traceId) { Thread.sleep(sleepDuration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Handle interruption - LOGGER.log( - Level.INFO, - String.format("Thread interrupted.")); + LOGGER.log(Level.INFO, String.format("Thread interrupted.")); return false; // Return false if interrupted } return isExportedEndToEndTraceValid(traceId); From d6f3f4c1b101a133a19936480477993a87cad94e Mon Sep 17 00:00:00 2001 From: Naresh Chaudhary Date: Tue, 15 Oct 2024 22:56:29 +0000 Subject: [PATCH 8/8] update dependencies --- google-cloud-spanner-executor/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index e5163a37660..9842973a927 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -45,7 +45,7 @@ com.google.cloud.opentelemetry exporter-trace - 0.31.0 + 0.32.0 com.google.cloud @@ -54,7 +54,7 @@ com.google.cloud google-cloud-trace - 2.51.0 + 2.52.0 io.grpc @@ -127,7 +127,7 @@ com.google.api.grpc proto-google-cloud-trace-v1 - 2.51.0 + 2.52.0 com.google.api.grpc