Skip to content

Commit

Permalink
census,observability: suppress message-events in traces when used by …
Browse files Browse the repository at this point in the history
…observability (#9485)
  • Loading branch information
sanjaypujare authored Aug 26, 2022
1 parent 96b9fc1 commit 6131a85
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 9 deletions.
13 changes: 10 additions & 3 deletions census/src/main/java/io/grpc/census/CensusTracingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ final class CensusTracingModule {
final Metadata.Key<SpanContext> tracingHeader;
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
private final boolean addMessageEvents;

CensusTracingModule(
Tracer censusTracer, final BinaryFormat censusPropagationBinaryFormat) {
Tracer censusTracer,
final BinaryFormat censusPropagationBinaryFormat,
boolean addMessageEvents) {
this.censusTracer = checkNotNull(censusTracer, "censusTracer");
checkNotNull(censusPropagationBinaryFormat, "censusPropagationBinaryFormat");
this.tracingHeader =
Expand All @@ -114,6 +117,7 @@ public SpanContext parseBytes(byte[] serialized) {
}
}
});
this.addMessageEvents = addMessageEvents;
}

/**
Expand Down Expand Up @@ -211,9 +215,12 @@ private static EndSpanOptions createEndSpanOptions(
.build();
}

private static void recordMessageEvent(
private void recordMessageEvent(
Span span, MessageEvent.Type type,
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
if (!addMessageEvents) {
return;
}
MessageEvent.Builder eventBuilder = MessageEvent.builder(type, seqNo);
if (optionalUncompressedSize != -1) {
eventBuilder.setUncompressedMessageSize(optionalUncompressedSize);
Expand Down Expand Up @@ -282,7 +289,7 @@ void callEnded(io.grpc.Status status) {
}
}

private static final class ClientTracer extends ClientStreamTracer {
private final class ClientTracer extends ClientStreamTracer {
private final Span span;
final Metadata.Key<SpanContext> tracingHeader;
final boolean isSampledToLocalTracing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,41 @@ private InternalCensusTracingAccessor() {
* Returns a {@link ClientInterceptor} with default tracing implementation.
*/
public static ClientInterceptor getClientInterceptor() {
return getClientInterceptor(true);
}

/**
* @param addMessageEvents add message events to Spans
*
* @return a {@link ClientInterceptor} with default tracing implementation.
*/
public static ClientInterceptor getClientInterceptor(
boolean addMessageEvents) {
CensusTracingModule censusTracing =
new CensusTracingModule(
Tracing.getTracer(),
Tracing.getPropagationComponent().getBinaryFormat());
Tracing.getPropagationComponent().getBinaryFormat(),
addMessageEvents);
return censusTracing.getClientInterceptor();
}

/**
* Returns a {@link ServerStreamTracer.Factory} with default stats implementation.
*/
public static ServerStreamTracer.Factory getServerStreamTracerFactory() {
return getServerStreamTracerFactory(true);
}

/**
* Returns a {@link ServerStreamTracer.Factory} with default stats implementation.
*/
public static ServerStreamTracer.Factory getServerStreamTracerFactory(
boolean addMessageEvents) {
CensusTracingModule censusTracing =
new CensusTracingModule(
Tracing.getTracer(),
Tracing.getPropagationComponent().getBinaryFormat());
Tracing.getPropagationComponent().getBinaryFormat(),
addMessageEvents);
return censusTracing.getServerTracerFactory();
}
}
4 changes: 2 additions & 2 deletions census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void setUp() throws Exception {
new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
true, true, true, false /* real-time */, true);
censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler);
censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler, true);
}

@After
Expand Down Expand Up @@ -1514,7 +1514,7 @@ public Long apply(AggregationData arg) {
});
}

private static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> methodDescriptor;
private final Attributes attributes;
private final String authority;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.census;

import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
import io.grpc.testing.GrpcServerRule;
import io.opencensus.trace.MessageEvent;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanBuilder;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import java.io.InputStream;
import java.util.Random;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/**
* Test for {@link CensusTracingModule}.
*/
@RunWith(JUnit4.class)
public class CensusTracingNoMessageEventTest {
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
ClientStreamTracer.StreamInfo.newBuilder().build();

private static class StringInputStream extends InputStream {
final String string;

StringInputStream(String string) {
this.string = string;
}

@Override
public int read() {
// InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
new MethodDescriptor.Marshaller<String>() {
@Override
public InputStream stream(String value) {
return new StringInputStream(value);
}

@Override
public String parse(InputStream stream) {
return ((StringInputStream) stream).string;
}
};

@Rule
public final MockitoRule mocks = MockitoJUnit.rule();

private final MethodDescriptor<String, String> method =
MethodDescriptor.<String, String>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setRequestMarshaller(MARSHALLER)
.setResponseMarshaller(MARSHALLER)
.setFullMethodName("package1.service2/method3")
.build();

private final FakeStatsRecorder statsRecorder = new FakeStatsRecorder();
private final Random random = new Random(1234);
private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random));
private final Span spyAttemptSpan = spy(MockableSpan.generateRandomSpan(random));
private final SpanContext fakeAttemptSpanContext = spyAttemptSpan.getContext();
private final Span spyServerSpan = spy(MockableSpan.generateRandomSpan(random));
private final byte[] binarySpanContext = new byte[]{3, 1, 5};
private final SpanBuilder spyClientSpanBuilder = spy(new MockableSpan.Builder());
private final SpanBuilder spyAttemptSpanBuilder = spy(new MockableSpan.Builder());
private final SpanBuilder spyServerSpanBuilder = spy(new MockableSpan.Builder());

@Rule
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();

@Mock
private Tracer tracer;
@Mock
private BinaryFormat mockTracingPropagationHandler;

@Captor
private ArgumentCaptor<MessageEvent> messageEventCaptor;

private CensusTracingModule censusTracing;

@Before
public void setUp() throws Exception {
when(spyClientSpanBuilder.startSpan()).thenReturn(spyClientSpan);
when(spyAttemptSpanBuilder.startSpan()).thenReturn(spyAttemptSpan);
when(tracer.spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), ArgumentMatchers.<Span>any()))
.thenReturn(spyClientSpanBuilder);
when(tracer.spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), ArgumentMatchers.<Span>any()))
.thenReturn(spyAttemptSpanBuilder);
when(spyServerSpanBuilder.startSpan()).thenReturn(spyServerSpan);
when(tracer.spanBuilderWithRemoteParent(anyString(), ArgumentMatchers.<SpanContext>any()))
.thenReturn(spyServerSpanBuilder);
when(mockTracingPropagationHandler.toByteArray(any(SpanContext.class)))
.thenReturn(binarySpanContext);
when(mockTracingPropagationHandler.fromByteArray(any(byte[].class)))
.thenReturn(fakeAttemptSpanContext);

censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler, false);
}

@After
public void wrapUp() {
assertNull(statsRecorder.pollRecord());
}

@Test
public void clientBasicTracingNoMessageEvents() {
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);

clientStreamTracer.outboundMessage(0);
clientStreamTracer.outboundMessageSent(0, 882, -1);
clientStreamTracer.inboundMessage(0);
clientStreamTracer.outboundMessage(1);
clientStreamTracer.outboundMessageSent(1, -1, 27);
clientStreamTracer.inboundMessageRead(0, 255, 90);

clientStreamTracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);

InOrder inOrder = inOrder(spyClientSpan, spyAttemptSpan);
inOrder.verify(spyAttemptSpan, times(0)).addMessageEvent(messageEventCaptor.capture());
}

@Test
public void serverBasicTracingNoMessageEvents() {
ServerStreamTracer.Factory tracerFactory = censusTracing.getServerTracerFactory();
ServerStreamTracer serverStreamTracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());

serverStreamTracer.serverCallStarted(
new CensusModulesTest.CallInfo<>(method, Attributes.EMPTY, null));

serverStreamTracer.outboundMessage(0);
serverStreamTracer.outboundMessageSent(0, 882, -1);
serverStreamTracer.inboundMessage(0);
serverStreamTracer.outboundMessage(1);
serverStreamTracer.outboundMessageSent(1, -1, 27);
serverStreamTracer.inboundMessageRead(0, 255, 90);

serverStreamTracer.streamClosed(Status.CANCELLED);

InOrder inOrder = inOrder(spyServerSpan);
inOrder.verify(spyServerSpan, times(0)).addMessageEvent(messageEventCaptor.capture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ private void setProducer(
}
if (config.isEnableCloudTracing()) {
clientInterceptors.add(
getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor()));
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor(false)));
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory(false));
}

InternalGlobalInterceptors.setInterceptorsTracers(
Expand Down

0 comments on commit 6131a85

Please sign in to comment.