From 2c1ea2695729590d578865455827fbcd4710e803 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Thu, 24 Oct 2024 13:18:35 -0700 Subject: [PATCH 1/6] wip create(Blob) instrumentation --- .../google/cloud/storage/GrpcStorageImpl.java | 78 +++++++++++----- ...lelCompositeUploadWritableByteChannel.java | 6 +- .../com/google/cloud/storage/StorageImpl.java | 7 +- .../google/cloud/storage/StorageInternal.java | 7 +- .../cloud/storage/GrpcOpenTelemetryTest.java | 91 +++++++++++++++++++ ...ompositeUploadWritableByteChannelTest.java | 39 ++++++-- .../cloud/storage/otel/TestExporter.java | 50 ++++++++++ 7 files changed, 239 insertions(+), 39 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index e139220de2..2735676f72 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -248,8 +248,23 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option public Blob create( BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); - return internalDirectUpload(blobInfo, opts, ByteBuffer.wrap(content, offset, length)) - .asBlob(this); + // Start the otel span to retain information of the origin of the request + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("create(BlobInfo, BlobTargetOption"); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { + return internalDirectUpload( + blobInfo, + opts, + ByteBuffer.wrap(content, offset, length), + openTelemetryTraceUtil.currentContext()) + .asBlob(this); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override @@ -799,9 +814,14 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options @Override public BlobInfo internalDirectUpload( - BlobInfo blobInfo, Opts opts, ByteBuffer buf) { + BlobInfo blobInfo, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { requireNonNull(blobInfo, "blobInfo must be non null"); requireNonNull(buf, "content must be non null"); + OpenTelemetryTraceUtil.Span otelSpan = + openTelemetryTraceUtil.startSpan("internalDirectUpload(BlobInfo)", ctx); Opts optsWithDefaults = opts.prepend(defaultOpts); GrpcCallContext grpcCallContext = optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -809,28 +829,36 @@ public BlobInfo internalDirectUpload( Hasher hasher = Hasher.enabled(); GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); RewindableContent content = RewindableContent.of(buf); - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> { - content.rewindTo(0); - UnbufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .setHasher(hasher) - .direct() - .unbuffered() - .setRequest(req) - .build(); - - try (UnbufferedWritableByteChannel c = session.open()) { - content.writeTo(c); - } - return session.getResult(); - }, - this::getBlob); + try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { + return Retrying.run( + getOptions(), + retryAlgorithmManager.getFor(req), + () -> { + content.rewindTo(0); + UnbufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) + .setByteStringStrategy(ByteStringStrategy.noCopy()) + .setHasher(hasher) + .direct() + .unbuffered() + .setRequest(req) + .build(); + + try (UnbufferedWritableByteChannel c = session.open()) { + content.writeTo(c); + } + return session.getResult(); + }, + this::getBlob); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index db807ce6ce..bb4edc2e3b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -219,7 +219,8 @@ public void close() throws IOException { // We never created any parts // create an empty object try { - BlobInfo blobInfo = storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0)); + BlobInfo blobInfo = + storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0), null); finalObject.set(blobInfo); return; } catch (StorageException se) { @@ -285,7 +286,8 @@ private void internalFlush(ByteBuffer buf) { ApiFutures.immediateFuture(partInfo), info -> { try { - return storage.internalDirectUpload(info, partOpts, buf); + // TODO: Add in Otel context when available + return storage.internalDirectUpload(info, partOpts, buf, null); } catch (StorageException e) { // a precondition failure usually means the part was created, but we didn't get the // response. And when we tried to retry the object already exists. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index ee538bcde8..1726660ca9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -50,6 +50,7 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest; import com.google.common.base.CharMatcher; @@ -1737,7 +1738,11 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, ByteBuffer buf) { + public BlobInfo internalDirectUpload( + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { BlobInfo.Builder builder = info.toBuilder() diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java index 0d700c46df..627654f56f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -20,6 +20,7 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; @@ -31,7 +32,11 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, ByteBuffer buf) { + default BlobInfo internalDirectUpload( + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { throw new UnsupportedOperationException("not implemented"); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java new file mode 100644 index 0000000000..5aaa20eacf --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiClock; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.registry.TestBench; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@SingleBackend(Backend.TEST_BENCH) +public class GrpcOpenTelemetryTest { + @Inject public TestBench testBench; + private static final ApiClock TIME_SOURCE = + new ApiClock() { + @Override + public long nanoTime() { + return 42_000_000_000L; + } + + @Override + public long millisTime() { + return 42_000L; + } + }; + private StorageOptions options; + private SpanExporter exporter = new TestExporter(); + + @Before + public void setUp() { + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build()) + .build(); + options = + StorageOptions.grpc() + .setHost(testBench.getGRPCBaseUri()) + .setProjectId("projectId") + .setOpenTelemetrySdk(openTelemetrySdk) + .build(); + } + + @Test + public void runCreateBucket() { + Storage storage = options.getService(); + String bucket = "random-bucket"; + storage.create(BucketInfo.of(bucket)); + TestExporter testExported = (TestExporter) exporter; + SpanData spanData = testExported.getExportedSpans().get(0); + Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service")); + Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo")); + Assert.assertEquals( + "com.google.cloud.google-cloud-storage", + getAttributeValue(spanData, "gcp.client.artifact")); + Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system")); + } + + private String getAttributeValue(SpanData spanData, String key) { + return spanData.getAttributes().get(AttributeKey.stringKey(key)).toString(); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java index 48bb4137e3..1e57b2a0b1 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java @@ -39,6 +39,7 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -356,9 +357,12 @@ public void partsRetainMetadata() throws Exception { new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, Opts opts, ByteBuffer buf) { + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { metadatas.add(info.getMetadata()); - return super.internalDirectUpload(info, opts, buf); + return super.internalDirectUpload(info, opts, buf, null); } @Override @@ -446,7 +450,10 @@ public void creatingAnEmptyObjectWhichFailsIsSetAsResultFailureAndThrowFromClose new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, Opts opts, ByteBuffer buf) { + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { throw StorageException.coalesce( ApiExceptionFactory.createException( null, GrpcStatusCode.of(Code.PERMISSION_DENIED), false)); @@ -557,7 +564,10 @@ public void shortCircuitExceptionResultsInFastFailure() throws Exception { new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, Opts opts, ByteBuffer buf) { + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { if (induceFailure.getAndSet(false)) { Uninterruptibles.awaitUninterruptibly(blockForWrite1); try { @@ -571,7 +581,7 @@ public BlobInfo internalDirectUpload( blockForWrite1Complete.countDown(); } } else { - return super.internalDirectUpload(info, opts, buf); + return super.internalDirectUpload(info, opts, buf, null); } } }; @@ -714,8 +724,11 @@ public void partFailedPreconditionOnRetryIsHandledGracefully() throws Exception new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, Opts opts, ByteBuffer buf) { - BlobInfo blobInfo = super.internalDirectUpload(info, opts, buf); + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { + BlobInfo blobInfo = super.internalDirectUpload(info, opts, buf, null); if (info.getName().equals(p1.getName())) { throw StorageException.coalesce( ApiExceptionFactory.createException( @@ -778,7 +791,10 @@ public void partMetadataFieldDecoratorUsesCustomTime() throws IOException { new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, Opts opts, ByteBuffer buf) { + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { if (info.getBlobId().getName().endsWith(".part")) { // Kinda hacky but since we are creating multiple parts we will use a range // to ensure the customTimes are being calculated appropriately @@ -787,7 +803,7 @@ public BlobInfo internalDirectUpload( } else { assertThat(info.getCustomTimeOffsetDateTime()).isNull(); } - return super.internalDirectUpload(info, opts, buf); + return super.internalDirectUpload(info, opts, buf, null); } }; ParallelCompositeUploadWritableByteChannel pcu = @@ -843,7 +859,10 @@ private static class FakeStorageInternal implements StorageInternal { @Override public BlobInfo internalDirectUpload( - BlobInfo info, Opts opts, ByteBuffer buf) { + BlobInfo info, + Opts opts, + ByteBuffer buf, + OpenTelemetryTraceUtil.Context ctx) { BlobId id = info.getBlobId(); BlobInfo.Builder b = info.toBuilder(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java new file mode 100644 index 0000000000..93ac4b3483 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/otel/TestExporter.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.otel; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +class TestExporter implements SpanExporter { + + public final List exportedSpans = Collections.synchronizedList(new ArrayList<>()); + + @Override + public CompletableResultCode export(Collection spans) { + exportedSpans.addAll(spans); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return null; + } + + @Override + public CompletableResultCode shutdown() { + return null; + } + + public List getExportedSpans() { + return exportedSpans; + } +} From 38568547f3bb7cb3d792038c8c73d6f002a4ce48 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Thu, 24 Oct 2024 13:58:14 -0700 Subject: [PATCH 2/6] fix checkstyle --- .../cloud/storage/GrpcOpenTelemetryTest.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java index 5aaa20eacf..062352b270 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java @@ -5,19 +5,20 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.google.cloud.storage; import com.google.api.core.ApiClock; +import com.google.auth.Credentials; +import com.google.cloud.NoCredentials; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.cloud.storage.it.runner.annotations.Inject; @@ -66,6 +67,7 @@ public void setUp() { StorageOptions.grpc() .setHost(testBench.getGRPCBaseUri()) .setProjectId("projectId") + .setCredentials(NoCredentials.getInstance()) .setOpenTelemetrySdk(openTelemetrySdk) .build(); } From 8fe835b6af1ffc962e6a1aaa21de0457017563a3 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Thu, 24 Oct 2024 15:04:28 -0700 Subject: [PATCH 3/6] linter --- ...GrpcOpenTelemetryTest.java => ITGrpcOpenTelemetryTest.java} | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) rename google-cloud-storage/src/test/java/com/google/cloud/storage/{GrpcOpenTelemetryTest.java => ITGrpcOpenTelemetryTest.java} (97%) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java similarity index 97% rename from google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java rename to google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java index 062352b270..64a7901b36 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/GrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -17,7 +17,6 @@ package com.google.cloud.storage; import com.google.api.core.ApiClock; -import com.google.auth.Credentials; import com.google.cloud.NoCredentials; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; @@ -37,7 +36,7 @@ @RunWith(StorageITRunner.class) @SingleBackend(Backend.TEST_BENCH) -public class GrpcOpenTelemetryTest { +public class ITGrpcOpenTelemetryTest { @Inject public TestBench testBench; private static final ApiClock TIME_SOURCE = new ApiClock() { From d14b892d1b9af0f519c04c40555d86533a3c0610 Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Fri, 25 Oct 2024 10:43:41 -0700 Subject: [PATCH 4/6] fix child spans to have rpc.system information and proper formatting --- .../storage/otel/OpenTelemetryInstance.java | 9 +++--- .../storage/ITGrpcOpenTelemetryTest.java | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java index 9c71f4842a..d7ce734c07 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/otel/OpenTelemetryInstance.java @@ -156,7 +156,6 @@ public Scope makeCurrent() { public OpenTelemetryTraceUtil.Span startSpan(String methodName) { String formatSpanName = String.format("%s.%s/%s", "storage", "client", methodName); SpanBuilder spanBuilder = tracer.spanBuilder(formatSpanName).setSpanKind(SpanKind.CLIENT); - spanBuilder.setAttribute("rpc.system", transport); io.opentelemetry.api.trace.Span span = addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); return new Span(span, formatSpanName); @@ -164,16 +163,17 @@ public OpenTelemetryTraceUtil.Span startSpan(String methodName) { @Override public OpenTelemetryTraceUtil.Span startSpan( - String spanName, OpenTelemetryTraceUtil.Context parent) { + String methodName, OpenTelemetryTraceUtil.Context parent) { assert (parent instanceof OpenTelemetryInstance.Context); + String formatSpanName = String.format("%s.%s/%s", "storage", "client", methodName); SpanBuilder spanBuilder = tracer - .spanBuilder(spanName) + .spanBuilder(formatSpanName) .setSpanKind(SpanKind.CLIENT) .setParent(((OpenTelemetryInstance.Context) parent).context); io.opentelemetry.api.trace.Span span = addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); - return new Span(span, spanName); + return new Span(span, formatSpanName); } @Nonnull @@ -196,6 +196,7 @@ private SpanBuilder addSettingsAttributesToCurrentSpan(SpanBuilder spanBuilder) .put("gcp.client.version", GaxProperties.getLibraryVersion(this.getClass())) .put("gcp.client.repo", "googleapis/java-storage") .put("gcp.client.artifact", "com.google.cloud.google-cloud-storage") + .put("rpc.system", transport) .build()); return spanBuilder; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java index 64a7901b36..e26f5fbc67 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -16,6 +16,8 @@ package com.google.cloud.storage; +import static java.nio.charset.StandardCharsets.UTF_8; + import com.google.api.core.ApiClock; import com.google.cloud.NoCredentials; import com.google.cloud.storage.it.runner.StorageITRunner; @@ -29,6 +31,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.List; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -86,6 +89,31 @@ public void runCreateBucket() { Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system")); } + @Test + public void runCreateBlob() { + Storage storage = options.getService(); + String bucket = "random-bucket"; + storage.create(BucketInfo.of(bucket)); + byte[] content = "Hello, World!".getBytes(UTF_8); + BlobId toCreate = BlobId.of(bucket, "blob"); + storage.create(BlobInfo.newBuilder(toCreate).build(), content); + TestExporter testExported = (TestExporter) exporter; + List spanData = testExported.getExportedSpans(); + // (1) Span to create the bucket + // (2) Span when calling create + // (3) Span when passing call to internalDirectUpload + Assert.assertEquals(3, spanData.size()); + for(SpanData span : spanData) { + Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); + Assert.assertEquals("googleapis/java-storage", + getAttributeValue(span, "gcp.client.repo")); + Assert.assertEquals( + "com.google.cloud.google-cloud-storage", + getAttributeValue(span, "gcp.client.artifact")); + Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system")); + } + } + private String getAttributeValue(SpanData spanData, String key) { return spanData.getAttributes().get(AttributeKey.stringKey(key)).toString(); } From 3543adf17383ca0f0a7bfa6317a08964a947c32f Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Fri, 25 Oct 2024 12:42:08 -0700 Subject: [PATCH 5/6] fix up tests --- .../storage/ITGrpcOpenTelemetryTest.java | 43 +++++++------------ 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java index e26f5fbc67..d3d8e61192 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -18,12 +18,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.api.core.ApiClock; import com.google.cloud.NoCredentials; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.cloud.storage.it.runner.annotations.Inject; import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.registry.Generator; import com.google.cloud.storage.it.runner.registry.TestBench; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -41,23 +41,15 @@ @SingleBackend(Backend.TEST_BENCH) public class ITGrpcOpenTelemetryTest { @Inject public TestBench testBench; - private static final ApiClock TIME_SOURCE = - new ApiClock() { - @Override - public long nanoTime() { - return 42_000_000_000L; - } - - @Override - public long millisTime() { - return 42_000L; - } - }; private StorageOptions options; - private SpanExporter exporter = new TestExporter(); + private SpanExporter exporter; + private Storage storage; + @Inject public Generator generator; + @Inject public BucketInfo testBucket; @Before public void setUp() { + exporter = new TestExporter(); OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder() .setTracerProvider( @@ -72,11 +64,11 @@ public void setUp() { .setCredentials(NoCredentials.getInstance()) .setOpenTelemetrySdk(openTelemetrySdk) .build(); + storage = options.getService(); } @Test public void runCreateBucket() { - Storage storage = options.getService(); String bucket = "random-bucket"; storage.create(BucketInfo.of(bucket)); TestExporter testExported = (TestExporter) exporter; @@ -91,27 +83,22 @@ public void runCreateBucket() { @Test public void runCreateBlob() { - Storage storage = options.getService(); - String bucket = "random-bucket"; - storage.create(BucketInfo.of(bucket)); byte[] content = "Hello, World!".getBytes(UTF_8); - BlobId toCreate = BlobId.of(bucket, "blob"); + BlobId toCreate = BlobId.of(testBucket.getName(), generator.randomObjectName()); storage.create(BlobInfo.newBuilder(toCreate).build(), content); TestExporter testExported = (TestExporter) exporter; List spanData = testExported.getExportedSpans(); - // (1) Span to create the bucket - // (2) Span when calling create - // (3) Span when passing call to internalDirectUpload - Assert.assertEquals(3, spanData.size()); - for(SpanData span : spanData) { + // (1) Span when calling create + // (2) Span when passing call to internalDirectUpload + Assert.assertEquals(2, spanData.size()); + for (SpanData span : spanData) { Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); - Assert.assertEquals("googleapis/java-storage", - getAttributeValue(span, "gcp.client.repo")); + Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo")); Assert.assertEquals( - "com.google.cloud.google-cloud-storage", - getAttributeValue(span, "gcp.client.artifact")); + "com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact")); Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system")); } + Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext()); } private String getAttributeValue(SpanData spanData, String key) { From b4d259101a8be743d8c0a521023bc9fe1627c34e Mon Sep 17 00:00:00 2001 From: Sydney Munro Date: Mon, 28 Oct 2024 15:04:43 -0700 Subject: [PATCH 6/6] pr comments --- .../google/cloud/storage/GrpcStorageImpl.java | 19 ++++++--- ...lelCompositeUploadWritableByteChannel.java | 6 +-- .../google/cloud/storage/StorageInternal.java | 5 +++ .../storage/ITGrpcOpenTelemetryTest.java | 6 +-- ...ompositeUploadWritableByteChannelTest.java | 39 +++++-------------- 5 files changed, 35 insertions(+), 40 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 2735676f72..5a16b6c299 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -201,8 +201,7 @@ public void close() throws Exception { @Override public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create(Bucket, BucketTargetOption"); + OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create"); Opts opts = Opts.unwrap(options).resolveFrom(bucketInfo).prepend(defaultOpts); GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -249,8 +248,7 @@ public Blob create( BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo); // Start the otel span to retain information of the origin of the request - OpenTelemetryTraceUtil.Span otelSpan = - openTelemetryTraceUtil.startSpan("create(BlobInfo, BlobTargetOption"); + OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create"); try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) { return internalDirectUpload( blobInfo, @@ -269,7 +267,8 @@ public Blob create( @Override public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { - try { + OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("create"); + try (OpenTelemetryTraceUtil.Scope ununsed = otelSpan.makeCurrent()) { requireNonNull(blobInfo, "blobInfo must be non null"); InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)); @@ -296,7 +295,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op ApiFuture responseApiFuture = session.getResult(); return this.getBlob(responseApiFuture); } catch (IOException | ApiException e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); throw StorageException.coalesce(e); + } finally { + otelSpan.end(); } } @@ -812,6 +815,12 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options hasher); } + @Override + public BlobInfo internalDirectUpload( + BlobInfo blobInfo, Opts opts, ByteBuffer buf) { + return internalDirectUpload(blobInfo, opts, buf, null); + } + @Override public BlobInfo internalDirectUpload( BlobInfo blobInfo, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index bb4edc2e3b..0d08759d1f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -219,8 +219,8 @@ public void close() throws IOException { // We never created any parts // create an empty object try { - BlobInfo blobInfo = - storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0), null); + // TODO: Add in Otel context when available + BlobInfo blobInfo = storage.internalDirectUpload(ultimateObject, opts, Buffers.allocate(0)); finalObject.set(blobInfo); return; } catch (StorageException se) { @@ -287,7 +287,7 @@ private void internalFlush(ByteBuffer buf) { info -> { try { // TODO: Add in Otel context when available - return storage.internalDirectUpload(info, partOpts, buf, null); + return storage.internalDirectUpload(info, partOpts, buf); } catch (StorageException e) { // a precondition failure usually means the part was created, but we didn't get the // response. And when we tried to retry the object already exists. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java index 627654f56f..403f03e9ee 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -32,6 +32,11 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, ByteBuffer buf) { + throw new UnsupportedOperationException("not implemented"); + } + default BlobInfo internalDirectUpload( BlobInfo info, Opts opts, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java index d3d8e61192..98d24ba159 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -88,9 +88,6 @@ public void runCreateBlob() { storage.create(BlobInfo.newBuilder(toCreate).build(), content); TestExporter testExported = (TestExporter) exporter; List spanData = testExported.getExportedSpans(); - // (1) Span when calling create - // (2) Span when passing call to internalDirectUpload - Assert.assertEquals(2, spanData.size()); for (SpanData span : spanData) { Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo")); @@ -98,6 +95,9 @@ public void runCreateBlob() { "com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact")); Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system")); } + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("create"))); + Assert.assertTrue( + spanData.stream().anyMatch(x -> x.getName().contains("internalDirectUpload"))); Assert.assertEquals(spanData.get(1).getSpanContext(), spanData.get(0).getParentSpanContext()); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java index 1e57b2a0b1..48bb4137e3 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannelTest.java @@ -39,7 +39,6 @@ import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; -import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -357,12 +356,9 @@ public void partsRetainMetadata() throws Exception { new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, - Opts opts, - ByteBuffer buf, - OpenTelemetryTraceUtil.Context ctx) { + BlobInfo info, Opts opts, ByteBuffer buf) { metadatas.add(info.getMetadata()); - return super.internalDirectUpload(info, opts, buf, null); + return super.internalDirectUpload(info, opts, buf); } @Override @@ -450,10 +446,7 @@ public void creatingAnEmptyObjectWhichFailsIsSetAsResultFailureAndThrowFromClose new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, - Opts opts, - ByteBuffer buf, - OpenTelemetryTraceUtil.Context ctx) { + BlobInfo info, Opts opts, ByteBuffer buf) { throw StorageException.coalesce( ApiExceptionFactory.createException( null, GrpcStatusCode.of(Code.PERMISSION_DENIED), false)); @@ -564,10 +557,7 @@ public void shortCircuitExceptionResultsInFastFailure() throws Exception { new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, - Opts opts, - ByteBuffer buf, - OpenTelemetryTraceUtil.Context ctx) { + BlobInfo info, Opts opts, ByteBuffer buf) { if (induceFailure.getAndSet(false)) { Uninterruptibles.awaitUninterruptibly(blockForWrite1); try { @@ -581,7 +571,7 @@ public BlobInfo internalDirectUpload( blockForWrite1Complete.countDown(); } } else { - return super.internalDirectUpload(info, opts, buf, null); + return super.internalDirectUpload(info, opts, buf); } } }; @@ -724,11 +714,8 @@ public void partFailedPreconditionOnRetryIsHandledGracefully() throws Exception new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, - Opts opts, - ByteBuffer buf, - OpenTelemetryTraceUtil.Context ctx) { - BlobInfo blobInfo = super.internalDirectUpload(info, opts, buf, null); + BlobInfo info, Opts opts, ByteBuffer buf) { + BlobInfo blobInfo = super.internalDirectUpload(info, opts, buf); if (info.getName().equals(p1.getName())) { throw StorageException.coalesce( ApiExceptionFactory.createException( @@ -791,10 +778,7 @@ public void partMetadataFieldDecoratorUsesCustomTime() throws IOException { new FakeStorageInternal() { @Override public BlobInfo internalDirectUpload( - BlobInfo info, - Opts opts, - ByteBuffer buf, - OpenTelemetryTraceUtil.Context ctx) { + BlobInfo info, Opts opts, ByteBuffer buf) { if (info.getBlobId().getName().endsWith(".part")) { // Kinda hacky but since we are creating multiple parts we will use a range // to ensure the customTimes are being calculated appropriately @@ -803,7 +787,7 @@ public BlobInfo internalDirectUpload( } else { assertThat(info.getCustomTimeOffsetDateTime()).isNull(); } - return super.internalDirectUpload(info, opts, buf, null); + return super.internalDirectUpload(info, opts, buf); } }; ParallelCompositeUploadWritableByteChannel pcu = @@ -859,10 +843,7 @@ private static class FakeStorageInternal implements StorageInternal { @Override public BlobInfo internalDirectUpload( - BlobInfo info, - Opts opts, - ByteBuffer buf, - OpenTelemetryTraceUtil.Context ctx) { + BlobInfo info, Opts opts, ByteBuffer buf) { BlobId id = info.getBlobId(); BlobInfo.Builder b = info.toBuilder();