Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixSampleRateForQuery #4

Open
wants to merge 1 commit into
base: users/fabianm/AvailabilityStrategyFixWithQuery
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,28 @@

import com.azure.core.util.Context;
import com.azure.cosmos.implementation.ConsoleLoggingRegistryFactory;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosMicrometerMetricsOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.rx.TestSuiteBase;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.micrometer.core.instrument.MeterRegistry;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -54,6 +62,17 @@ public void afterClass() {
this.client.close();
}

@DataProvider
public static Object[][] operationTypeProvider() {
return new Object[][]{
{ OperationType.Read },
{ OperationType.Replace },
{ OperationType.Create },
{ OperationType.Delete },
{ OperationType.Query }
};
}

public String resolveTestNameSuffix(Object[] row) {
return "";
}
Expand Down Expand Up @@ -179,12 +198,13 @@ public void defaultLoggerAndMetrics() {
// with custom appender
}

@Test(groups = { "simple", "emulator" }, timeOut = TIMEOUT)
public void delayedSampling() {
@Test(groups = { "simple", "emulator" }, dataProvider = "operationTypeProvider", timeOut = TIMEOUT)
public void delayedSampling(OperationType operationType) {
MeterRegistry meterRegistry = ConsoleLoggingRegistryFactory.create(1);

CapturingLogger capturingLogger = new CapturingLogger();
CosmosClientTelemetryConfig clientTelemetryCfg = new CosmosClientTelemetryConfig()
.diagnosticsHandler(CosmosDiagnosticsHandler.DEFAULT_LOGGING_HANDLER)
.diagnosticsHandler(capturingLogger)
.metricsOptions(new CosmosMicrometerMetricsOptions().meterRegistry(meterRegistry));

CosmosClientBuilder builder = this
Expand All @@ -197,17 +217,28 @@ public void delayedSampling() {
meterRegistry.clear();
meterRegistry.close();

String id = UUID.randomUUID().toString();
ObjectNode newItem = getDocumentDefinition(id);
container.createItem(newItem);

// change sample rate to 25%
clientTelemetryCfg.sampleDiagnostics(0.25);
executeTestCase(container);
executeDocumentOperation(container, operationType, id, newItem);

int loggedMessageSizeBefore = capturingLogger.getLoggedMessages().size();
// reduce sample rate to 0 - disable all diagnostics
clientTelemetryCfg.sampleDiagnostics(0);
executeTestCase(container);
executeDocumentOperation(container, operationType, id, newItem);
int loggedMessageSizeAfter = capturingLogger.getLoggedMessages().size();
// verify when sample rate is 0, the diagnostics will not be logged
assertThat(loggedMessageSizeBefore).isEqualTo(loggedMessageSizeAfter);

// set sample rate to 1 - enable all diagnostics (no sampling anymore)
clientTelemetryCfg.sampleDiagnostics(1);
executeTestCase(container);
executeDocumentOperation(container, operationType, id, newItem);
loggedMessageSizeAfter = capturingLogger.getLoggedMessages().size();
// Verify when sample rate is 1, the diagnostics will be logged
assertThat(loggedMessageSizeBefore + 1).isEqualTo(loggedMessageSizeAfter);

// no assertions here - invocations for diagnostics handler are validated above
// log4j event logging isn't validated in general in unit tests because it is too brittle to do so
Expand All @@ -232,6 +263,49 @@ public void defaultLoggerWithLegacyOpenTelemetryTraces() {
System.setProperty("COSMOS.USE_LEGACY_TRACING", "false");
}

private void executeDocumentOperation(
CosmosContainer cosmosContainer,
OperationType operationType,
String createdItemId,
ObjectNode createdItem) {
switch (operationType) {
case Query:
String query = String.format("SELECT * from c");
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setFeedRange(FeedRange.forLogicalPartition(new PartitionKey(createdItemId)));
Iterable<FeedResponse<JsonNode>> results = cosmosContainer.queryItems(query, queryRequestOptions, JsonNode.class).iterableByPage();
results.forEach(t -> {});
break;
case ReadFeed:
CosmosChangeFeedRequestOptions changeFeedRequestOptions = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
cosmosContainer.queryChangeFeed(changeFeedRequestOptions, JsonNode.class).iterableByPage();
break;
case Read:
cosmosContainer
.readItem(createdItemId, new PartitionKey(createdItemId), JsonNode.class);
break;
case Replace:
cosmosContainer
.replaceItem(createdItem, createdItemId, new PartitionKey(createdItemId), new CosmosItemRequestOptions());
break;
case Delete:
try {
cosmosContainer.deleteItem(getDocumentDefinition(UUID.randomUUID().toString()), new CosmosItemRequestOptions());
} catch (CosmosException e) {
if (!Exceptions.isNotFound(e)) {
throw e;
}
}
break;
case Create:
cosmosContainer.createItem(getDocumentDefinition(UUID.randomUUID().toString()));
break;
default:
throw new IllegalArgumentException("The operation type is not supported");
}
}

private void executeTestCase(CosmosContainer container) {
String id = UUID.randomUUID().toString();
CosmosItemResponse<ObjectNode> response = container.createItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,18 +600,14 @@ public <T> Mono<CosmosItemResponse<T>> traceEnabledCosmosItemResponsePublisher(
* @param publisher publisher to run.
* @return wrapped publisher.
*/
public <T> Flux<T> runUnderSpanInContext(Flux<T> publisher, CosmosPagedFluxOptions options) {
public <T> Flux<T> runUnderSpanInContext(Flux<T> publisher) {
return propagatingFlux.flatMap(ignored -> publisher);
}

public boolean shouldSampleOutOperation(CosmosPagedFluxOptions options) {
final double samplingRateSnapshot = clientTelemetryConfigAccessor.getSamplingRate(this.telemetryConfig);

options.setSamplingRateSnapshot(samplingRateSnapshot);

if (shouldSampleOutOperation(samplingRateSnapshot)) {
return publisher;
}

return propagatingFlux
.flatMap(ignored -> publisher);
return shouldSampleOutOperation(samplingRateSnapshot);
}

private boolean shouldSampleOutOperation(double samplingRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,94 @@ private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
return cosmosPagedFluxOptions;
}

private <TOutput> Flux<TOutput> wrapWithTracingIfEnabled(CosmosPagedFluxOptions pagedFluxOptions, Flux<TOutput> publisher) {
private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(
CosmosPagedFluxOptions pagedFluxOptions,
Flux<FeedResponse<T>> publisher,
AtomicLong feedResponseConsumerLatencyInNanos,
Context context) {
DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider();
if (tracerProvider == null ||
!tracerProvider.isEnabled()) {

return publisher;
}

return tracerProvider.runUnderSpanInContext(publisher, pagedFluxOptions);
if (tracerProvider.shouldSampleOutOperation(pagedFluxOptions)) {
return publisher;
}

final CosmosDiagnosticsContext cosmosCtx = ctxAccessor.create(
pagedFluxOptions.getSpanName(),
pagedFluxOptions.getAccountTag(),
BridgeInternal.getServiceEndpoint(pagedFluxOptions.getCosmosAsyncClient()),
pagedFluxOptions.getDatabaseId(),
pagedFluxOptions.getContainerId(),
pagedFluxOptions.getResourceType(),
pagedFluxOptions.getOperationType(),
pagedFluxOptions.getOperationId(),
pagedFluxOptions.getEffectiveConsistencyLevel(),
pagedFluxOptions.getMaxItemCount(),
pagedFluxOptions.getDiagnosticsThresholds(),
null,
pagedFluxOptions.getConnectionMode(),
pagedFluxOptions.getUserAgent());
ctxAccessor.setSamplingRateSnapshot(cosmosCtx, pagedFluxOptions.getSamplingRateSnapshot());

Flux<FeedResponse<T>> result = tracerProvider
.runUnderSpanInContext(publisher)
.doOnEach(signal -> {
FeedResponse<T> response = signal.get();
Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView());
switch (signal.getType()) {
case ON_COMPLETE:
this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos);

tracerProvider.recordFeedResponseConsumerLatency(
signal,
Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));

tracerProvider.endSpan(traceCtx);

break;
case ON_NEXT:
this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos);

break;

case ON_ERROR:
tracerProvider.recordFeedResponseConsumerLatency(
signal,
Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));

// all info is extracted from CosmosException when applicable
tracerProvider.endSpan(
traceCtx,
signal.getThrowable()
);

break;

default:
break;
}
});

return Flux
.deferContextual(reactorCtx -> result
.doOnCancel(() -> {
Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
tracerProvider.endSpan(traceCtx);
})
.doOnComplete(() -> {
Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
tracerProvider.endSpan(traceCtx);
}))
.contextWrite(
DiagnosticsProvider.setContextInReactor(
tracerProvider.startSpan(
pagedFluxOptions.getSpanName(),
cosmosCtx,
context)));
}

private void recordFeedResponse(
Expand Down Expand Up @@ -203,90 +282,15 @@ private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions pagedFluxOptions, Co
AtomicReference<Instant> startTime = new AtomicReference<>();
AtomicLong feedResponseConsumerLatencyInNanos = new AtomicLong(0);

Flux<FeedResponse<T>> result =
wrapWithTracingIfEnabled(
pagedFluxOptions, this.optionsFluxFunction.apply(pagedFluxOptions))
.doOnSubscribe(ignoredValue -> {
startTime.set(Instant.now());
feedResponseConsumerLatencyInNanos.set(0);
})
.doOnEach(signal -> {

FeedResponse<T> response = signal.get();
Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView());
DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider();
switch (signal.getType()) {
case ON_COMPLETE:
this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos);

if (isTracerEnabled(tracerProvider)) {
tracerProvider.recordFeedResponseConsumerLatency(
signal,
Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));

tracerProvider.endSpan(traceCtx);
}

break;
case ON_NEXT:
this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos);

break;

case ON_ERROR:
if (isTracerEnabled(tracerProvider)) {
tracerProvider.recordFeedResponseConsumerLatency(
signal,
Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));

// all info is extracted from CosmosException when applicable
tracerProvider.endSpan(
traceCtx,
signal.getThrowable()
);
}

break;

default:
break;
}});


final DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider();
final CosmosDiagnosticsContext cosmosCtx = ctxAccessor.create(
pagedFluxOptions.getSpanName(),
pagedFluxOptions.getAccountTag(),
BridgeInternal.getServiceEndpoint(pagedFluxOptions.getCosmosAsyncClient()),
pagedFluxOptions.getDatabaseId(),
pagedFluxOptions.getContainerId(),
pagedFluxOptions.getResourceType(),
pagedFluxOptions.getOperationType(),
pagedFluxOptions.getOperationId(),
pagedFluxOptions.getEffectiveConsistencyLevel(),
pagedFluxOptions.getMaxItemCount(),
pagedFluxOptions.getDiagnosticsThresholds(),
null,
pagedFluxOptions.getConnectionMode(),
pagedFluxOptions.getUserAgent());
ctxAccessor.setSamplingRateSnapshot(cosmosCtx, pagedFluxOptions.getSamplingRateSnapshot());
pagedFluxOptions.setDiagnosticsContext(cosmosCtx);

return Flux
.deferContextual(reactorCtx -> result
.doOnCancel(() -> {
Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
tracerProvider.endSpan(traceCtx);
})
.doOnComplete(() -> {
Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
tracerProvider.endSpan(traceCtx);
}))
.contextWrite(DiagnosticsProvider.setContextInReactor(
pagedFluxOptions.getDiagnosticsProvider().startSpan(
pagedFluxOptions.getSpanName(),
cosmosCtx,
context)));
return wrapWithTracingIfEnabled(
pagedFluxOptions,
this.optionsFluxFunction.apply(pagedFluxOptions),
feedResponseConsumerLatencyInNanos,
context)
.doOnSubscribe(ignoredValue -> {
startTime.set(Instant.now());
feedResponseConsumerLatencyInNanos.set(0);
});
}

private boolean isTracerEnabled(DiagnosticsProvider tracerProvider) {
Expand Down
Loading