diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index 1d688dea1a..9842973a92 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -22,10 +22,40 @@ + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-sdk-common + + + io.opentelemetry + opentelemetry-sdk-trace + + + com.google.cloud.opentelemetry + exporter-trace + 0.32.0 + com.google.cloud google-cloud-spanner + + com.google.cloud + google-cloud-trace + 2.52.0 + io.grpc grpc-api @@ -94,6 +124,11 @@ com.google.api.grpc proto-google-cloud-spanner-executor-v1 + + com.google.api.grpc + proto-google-cloud-trace-v1 + 2.52.0 + com.google.api.grpc grpc-google-cloud-spanner-executor-v1 diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index d180f55d06..714dbc309c 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; import com.google.api.gax.retrying.RetrySettings; @@ -70,15 +71,21 @@ import com.google.cloud.spanner.TimestampBound; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Value; import com.google.cloud.spanner.encryption.CustomerManagedEncryption; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; +import com.google.cloud.trace.v1.TraceServiceClient; +import com.google.cloud.trace.v1.TraceServiceSettings; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.cloudtrace.v1.GetTraceRequest; +import com.google.devtools.cloudtrace.v1.Trace; +import com.google.devtools.cloudtrace.v1.TraceSpan; import com.google.longrunning.Operation; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; @@ -152,6 +159,9 @@ import com.google.spanner.v1.TypeCode; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -166,7 +176,9 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -332,24 +344,28 @@ public void startRWTransaction() throws Exception { // Try to commit return null; }; + io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current(); Runnable runnable = - () -> { - try { - runner = - optimistic - ? dbClient.readWriteTransaction(Options.optimisticLock()) - : dbClient.readWriteTransaction(); - LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); - runner.run(callable); - transactionSucceeded(runner.getCommitTimestamp().toProto()); - } catch (SpannerException e) { - LOGGER.log( - Level.WARNING, - String.format("Transaction runnable failed with exception %s\n", e.getMessage()), - e); - transactionFailed(e); - } - }; + context.wrap( + () -> { + try { + runner = + optimistic + ? dbClient.readWriteTransaction(Options.optimisticLock()) + : dbClient.readWriteTransaction(); + LOGGER.log( + Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); + runner.run(callable); + transactionSucceeded(runner.getCommitTimestamp().toProto()); + } catch (SpannerException e) { + LOGGER.log( + Level.WARNING, + String.format( + "Transaction runnable failed with exception %s\n", e.getMessage()), + e); + transactionFailed(e); + } + }); LOGGER.log( Level.INFO, String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed)); @@ -753,6 +769,11 @@ public synchronized void closeBatchTxn() throws SpannerException { Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("action-pool-%d").build()); + // Thread pool to verify end to end traces. + private static final ExecutorService endToEndTracesThreadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("end-to-end-traces-pool-%d").build()); + private synchronized Spanner getClientWithTimeout( long timeoutSeconds, boolean useMultiplexedSession) throws IOException { if (clientWithTimeout != null) { @@ -818,6 +839,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex .setHost(HOST_PREFIX + WorkerProxy.spannerPort) .setCredentials(credentials) .setChannelProvider(channelProvider) + .setEnableEndToEndTracing(true) + .setOpenTelemetry(WorkerProxy.openTelemetrySdk) .setSessionPoolOption(sessionPoolOptions); SpannerStubSettings.Builder stubSettingsBuilder = @@ -841,6 +864,88 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex return optionsBuilder.build().getService(); } + private TraceServiceClient traceServiceClient; + + // Return the trace service client, create one if not exists. + private synchronized TraceServiceClient getTraceServiceClient() throws IOException { + if (traceServiceClient != null) { + return traceServiceClient; + } + // Create a trace service client + Credentials credentials; + if (WorkerProxy.serviceKeyFile.isEmpty()) { + credentials = NoCredentials.getInstance(); + } else { + credentials = + GoogleCredentials.fromStream( + new ByteArrayInputStream( + FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))), + HTTP_TRANSPORT_FACTORY); + } + + TraceServiceSettings traceServiceSettings = + TraceServiceSettings.newBuilder() + .setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .build(); + + traceServiceClient = TraceServiceClient.create(traceServiceSettings); + return traceServiceClient; + } + + public Future getEndToEndTraceVerificationTask(String traceId) { + return endToEndTracesThreadPool.submit( + () -> { + try { + // Wait for 10 seconds before verifying to ensure traces are exported. + long sleepDuration = TimeUnit.SECONDS.toMillis(10); + LOGGER.log( + Level.INFO, + String.format( + "Sleeping for %d milliseconds before verifying end to end trace", + sleepDuration)); + Thread.sleep(sleepDuration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Handle interruption + LOGGER.log(Level.INFO, String.format("Thread interrupted.")); + return false; // Return false if interrupted + } + return isExportedEndToEndTraceValid(traceId); + }); + } + + private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; + private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; + + /* Returns whether a exported trace is valid. */ + public boolean isExportedEndToEndTraceValid(String traceId) { + try { + GetTraceRequest getTraceRequest = + GetTraceRequest.newBuilder() + .setProjectId(WorkerProxy.PROJECT_ID) + .setTraceId(traceId) + .build(); + Trace trace = getTraceServiceClient().getTrace(getTraceRequest); + boolean readWriteOrReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false; + for (TraceSpan span : trace.getSpansList()) { + if (span.getName().contains(READ_ONLY_TRANSACTION) + || span.getName().contains(READ_WRITE_TRANSACTION)) { + readWriteOrReadOnlyTxnPresent = true; + } + if (span.getName().startsWith("Spanner.")) { + spannerServerSideSpanPresent = true; + } + } + if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent) { + return false; + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed to verify end to end trace.", e); + return false; + } + return true; + } + /** Handle actions. */ public Status startHandlingRequest( SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) { @@ -865,17 +970,20 @@ public Status startHandlingRequest( useMultiplexedSession = false; } + io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current(); actionThreadPool.execute( - () -> { - Status status = - executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext); - if (!status.isOk()) { - LOGGER.log( - Level.WARNING, - String.format("Failed to execute action with error: %s\n%s", status, action)); - executionContext.onError(status.getCause()); - } - }); + context.wrap( + () -> { + Status status = + executeAction( + outcomeSender, action, dbPath, useMultiplexedSession, executionContext); + if (!status.isOk()) { + LOGGER.log( + Level.WARNING, + String.format("Failed to execute action with error: %s\n%s", status, action)); + executionContext.onError(status.getCause()); + } + })); return Status.OK; } @@ -886,7 +994,10 @@ private Status executeAction( String dbPath, boolean useMultiplexedSession, ExecutionFlowContext executionContext) { - + Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName()); + String actionType = action.getActionCase().toString(); + Span span = tracer.spanBuilder(String.format("performaction_%s", actionType)).startSpan(); + Scope scope = span.makeCurrent(); try { if (action.hasAdmin()) { return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender); @@ -959,11 +1070,15 @@ private Status executeAction( ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action))); } } catch (Exception e) { + span.recordException(e); LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage()); return outcomeSender.finishWithError( toStatus( SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage()))); + } finally { + scope.close(); + span.end(); } } diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java index d2e7d9b19d..6fee10c95b 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java @@ -26,6 +26,11 @@ import com.google.spanner.executor.v1.SpannerOptions; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,16 +45,39 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP // Ratio of operations to use multiplexed sessions. private final double multiplexedSessionOperationsRatio; + // Count of checks performed to verify end to end traces using Cloud Trace APIs. + private int cloudTraceCheckCount = 0; + + // Maximum checks allowed to verify end to end traces using Cloud Trace APIs. + private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20; + public CloudExecutorImpl( boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) { clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector); this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio; } + private synchronized void incrementCloudTraceCheckCount() { + cloudTraceCheckCount++; + } + + private synchronized int getCloudTraceCheckCount() { + return cloudTraceCheckCount; + } + /** Execute SpannerAsync action requests. */ @Override public StreamObserver executeActionAsync( StreamObserver responseObserver) { + // Create a top-level OpenTelemetry span for streaming request. + Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName()); + Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan(); + Scope scope = span.makeCurrent(); + + final String traceId = span.getSpanContext().getTraceId(); + final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled(); + AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false); + CloudClientExecutor.ExecutionFlowContext executionContext = clientExecutor.new ExecutionFlowContext(responseObserver); return new StreamObserver() { @@ -86,6 +114,11 @@ public void onNext(SpannerAsyncActionRequest request) { Level.INFO, String.format("Updated request to set multiplexed session flag: \n%s", request)); } + String actionName = request.getAction().getActionCase().toString(); + if (actionName == "READ" || actionName == "QUERY") { + requestHasReadOrQueryAction.set(true); + } + Status status = clientExecutor.startHandlingRequest(request, executionContext); if (!status.isOk()) { LOGGER.log( @@ -104,6 +137,41 @@ public void onError(Throwable t) { @Override public void onCompleted() { + // Close the scope and end the span. + scope.close(); + span.end(); + if (isSampled + && getCloudTraceCheckCount() < MAX_CLOUD_TRACE_CHECK_LIMIT + && requestHasReadOrQueryAction.get()) { + Future traceVerificationTask = + clientExecutor.getEndToEndTraceVerificationTask(traceId); + try { + LOGGER.log( + Level.INFO, + String.format("Starting end to end trace verification for trace_id:%s", traceId)); + Boolean isValidTrace = traceVerificationTask.get(); + incrementCloudTraceCheckCount(); + if (!isValidTrace) { + executionContext.onError( + Status.INTERNAL + .withDescription( + String.format( + "failed to verify end to end trace for trace_id: %s", traceId)) + .getCause()); + executionContext.cleanup(); + return; + } + } catch (Exception e) { + LOGGER.log( + Level.WARNING, + String.format( + "Failed to verify end to end trace with exception: %s\n", e.getMessage()), + e); + executionContext.onError(e); + executionContext.cleanup(); + return; + } + } LOGGER.log(Level.INFO, "Client called Done, half closed"); executionContext.cleanup(); responseObserver.onCompleted(); diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java index 30a4d98a35..17b98bbdad 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.spi.v1.TraceContextInterceptor; import com.google.common.net.HostAndPort; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; @@ -91,6 +92,7 @@ public static ManagedChannelBuilder getChannelBuilderForTestGFE( return channelBuilder .overrideAuthority(hostInCert) .sslContext(sslContext) + .intercept(new TraceContextInterceptor(WorkerProxy.openTelemetrySdk)) .negotiationType(NegotiationType.TLS); } catch (Throwable t) { throw new RuntimeException(t); diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java index 61034754f2..2146adb1d4 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java @@ -16,12 +16,29 @@ package com.google.cloud.executor.spanner; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auth.Credentials; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.protobuf.services.ProtoReflectionService; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,6 +47,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; /** * Worker proxy for Java API. This is the main entry of the Java client proxy on cloud Spanner Java @@ -55,13 +73,48 @@ public class WorkerProxy { public static double multiplexedSessionOperationsRatio = 0.0; public static boolean usePlainTextChannel = false; public static boolean enableGrpcFaultInjector = false; + public static OpenTelemetrySdk openTelemetrySdk; public static CommandLine commandLine; + public static final String PROJECT_ID = "spanner-cloud-systest"; + public static final String CLOUD_TRACE_ENDPOINT = "staging-cloudtrace.sandbox.googleapis.com:443"; + private static final int MIN_PORT = 0, MAX_PORT = 65535; - private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0; + private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0, TRACE_SAMPLING_RATE = 0.01; + + public static OpenTelemetrySdk setupOpenTelemetrySdk() throws Exception { + // Read credentials from the serviceKeyFile. + HttpTransportFactory HTTP_TRANSPORT_FACTORY = NetHttpTransport::new; + Credentials credentials = + GoogleCredentials.fromStream( + new ByteArrayInputStream(FileUtils.readFileToByteArray(new File(serviceKeyFile))), + HTTP_TRANSPORT_FACTORY); + + // OpenTelemetry configuration. + SpanExporter spanExporter = + TraceExporter.createWithConfiguration( + TraceConfiguration.builder() + .setProjectId(PROJECT_ID) + .setCredentials(credentials) + .setTraceServiceEndpoint(CLOUD_TRACE_ENDPOINT) + .build()); + return OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .setResource(Resource.getDefault()) + .setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(TRACE_SAMPLING_RATE))) + .build()) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .build(); + } public static void main(String[] args) throws Exception { + // Enable OpenTelemetry metrics and traces before injecting Opentelemetry. + SpannerOptions.enableOpenTelemetryMetrics(); + SpannerOptions.enableOpenTelemetryTraces(); + commandLine = buildOptions(args); if (!commandLine.hasOption(OPTION_SPANNER_PORT)) { @@ -117,6 +170,8 @@ public static void main(String[] args) throws Exception { + MAX_RATIO); } } + // Setup the OpenTelemetry for tracing. + openTelemetrySdk = setupOpenTelemetrySdk(); Server server; while (true) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java index 3b46ba4f88..4280e31035 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/TraceContextInterceptor.java @@ -33,11 +33,11 @@ * Spanner. This class takes reference from OpenTelemetry's JAVA instrumentation library for gRPC. * https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/9ecf7965aa455d41ea8cc0761b6c6b6eeb106324/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java#L27 */ -class TraceContextInterceptor implements ClientInterceptor { +public class TraceContextInterceptor implements ClientInterceptor { private final TextMapPropagator textMapPropagator; - TraceContextInterceptor(OpenTelemetry openTelemetry) { + public TraceContextInterceptor(OpenTelemetry openTelemetry) { this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator(); }