From 56be0d7f77b3d132846eb7c9d8a98de1ca3e52b5 Mon Sep 17 00:00:00 2001
From: David Noble
Date: Tue, 17 Mar 2020 16:21:39 -0700
Subject: [PATCH] Completed port from v4. Unit tests pass locally. Next up:
tests on cloud.
---
.../data/cosmos/internal/RequestTimeline.java | 195 +++++
.../RntbdTransportClient.java | 88 ++-
.../rntbd/RntbdClientChannelHandler.java | 6 +-
.../RntbdClientChannelHealthChecker.java | 171 ++---
.../rntbd/RntbdConstants.java | 690 +++++++++++-------
.../rntbd/RntbdContext.java | 14 +-
.../rntbd/RntbdContextRequest.java | 4 +-
.../rntbd/RntbdContextRequestEncoder.java | 36 +-
.../rntbd/RntbdMetrics.java | 187 +++--
.../rntbd/RntbdRequestArgs.java | 44 +-
.../rntbd/RntbdRequestDecoder.java | 16 +-
.../rntbd/RntbdRequestEncoder.java | 23 +-
.../rntbd/RntbdRequestFrame.java | 2 +-
.../rntbd/RntbdRequestHeaders.java | 44 +-
.../rntbd/RntbdRequestManager.java | 57 +-
.../rntbd/RntbdRequestRecord.java | 187 ++++-
.../rntbd/RntbdResponse.java | 245 ++++---
.../rntbd/RntbdResponseDecoder.java | 17 +-
.../rntbd/RntbdServiceEndpoint.java | 16 +-
.../directconnectivity/rntbd/RntbdToken.java | 38 +-
.../rntbd/RntbdTokenStream.java | 58 +-
.../rntbd/RntbdTokenType.java | 2 +-
22 files changed, 1413 insertions(+), 727 deletions(-)
create mode 100644 sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RequestTimeline.java
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RequestTimeline.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RequestTimeline.java
new file mode 100644
index 0000000000000..acf582d960d30
--- /dev/null
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/RequestTimeline.java
@@ -0,0 +1,195 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.data.cosmos.internal;
+
+import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdObjectMapper;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
+import com.google.common.collect.ImmutableList;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Represents the startTime and duration of important events in the lifetime of a request.
+ *
+ * A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, startTime, and
+ * duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
+ * request timelines for:
+ *
+ * A {@link RequestTimeline} serializes to JSON as an array of {@link Event} instances. This is the default
+ * serialization for any class that implements {@link Iterable}.
+ *
+ */
+public final class RequestTimeline implements Iterable {
+
+ private static final RequestTimeline EMPTY = new RequestTimeline();
+ private final ImmutableList events;
+
+ private RequestTimeline() {
+ this.events = ImmutableList.of();
+ }
+
+ private RequestTimeline(final ImmutableList events) {
+ checkNotNull(events, "expected non-null events");
+ this.events = events;
+ }
+
+ /**
+ * Returns an empty {@link RequestTimeline}.
+ *
+ * The empty startTime line returned is static.
+ *
+ * @return an empty {@link RequestTimeline}.
+ */
+ public static RequestTimeline empty() {
+ return EMPTY;
+ }
+
+ /**
+ * Returns an iterator for enumerating the {@link Event} instances in this {@link RequestTimeline}.
+ *
+ * @return an iterator for enumerating the {@link Event} instances in this {@link RequestTimeline}.
+ */
+ @Override
+ public Iterator iterator() {
+ return this.events.iterator();
+ }
+
+ /**
+ * Returns an empty {@link RequestTimeline}.
+ *
+ * The empty startTime line returned is static and equivalent to calling {@link RequestTimeline#empty}.
+ *
+ * @return an empty request timeline.
+ */
+ public static RequestTimeline of() {
+ return EMPTY;
+ }
+
+ /**
+ * Returns a new {@link RequestTimeline} with a single event.
+ *
+ * @return a new {@link RequestTimeline} with a single event.
+ */
+ public static RequestTimeline of(final Event event) {
+ return new RequestTimeline(ImmutableList.of(event));
+ }
+
+ /**
+ * Returns a new {@link RequestTimeline} with a pair of events.
+ *
+ * @return a new {@link RequestTimeline} with a pair of events.
+ */
+ public static RequestTimeline of(final Event e1, final Event e2) {
+ return new RequestTimeline(ImmutableList.of(e1, e2));
+ }
+
+ /**
+ * Returns a new {@link RequestTimeline} with three events.
+ *
+ * @return a new {@link RequestTimeline} with three events.
+ */
+ public static RequestTimeline of(final Event e1, final Event e2, final Event e3) {
+ return new RequestTimeline(ImmutableList.of(e1, e2, e3));
+ }
+
+ /**
+ * Returns a new {@link RequestTimeline} with four events.
+ *
+ * @return a new {@link RequestTimeline} with four events.
+ */
+ public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4) {
+ return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4));
+ }
+
+ /**
+ * Returns a new {@link RequestTimeline} with five events.
+ *
+ * @return a new {@link RequestTimeline} with five events.
+ */
+ public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4, final Event e5) {
+ return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4, e5));
+ }
+
+ /**
+ * Returns a new {@link RequestTimeline} with an arbitrary number of events.
+ *
+ * @return a new {@link RequestTimeline} with an arbitrary number of events.
+ */
+ public static RequestTimeline of(final Event... events) {
+ return new RequestTimeline(ImmutableList.copyOf(events));
+ }
+
+ /**
+ * Returns a textual representation of this {@link RequestTimeline}.
+ *
+ * The textual representation returned is a string of the form {@code RequestTimeline(} <event-array>
+ * {@code )}.
+ */
+ @Override
+ public String toString() {
+ return RntbdObjectMapper.toString(this);
+ }
+
+ @JsonPropertyOrder({ "name", "startTime", "durationInMicroSec" })
+ public static final class Event {
+
+ @JsonIgnore
+ private final Duration duration;
+
+ @JsonSerialize(using = ToStringSerializer.class)
+ private final long durationInMicroSec;
+
+ @JsonProperty("eventName")
+ private final String name;
+
+ @JsonSerialize(using = ToStringSerializer.class)
+ private final OffsetDateTime startTime;
+
+ public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {
+
+ checkNotNull(name, "expected non-null name");
+
+ this.name = name;
+ this.startTime = from;
+
+ this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
+ if(this.duration != null) {
+ this.durationInMicroSec = duration.toNanos()/1000L;
+ } else {
+ this.durationInMicroSec = 0;
+ }
+ }
+
+ public Duration getDuration() {
+ return this.duration;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public OffsetDateTime getStartTime() {
+ return startTime;
+ }
+ }
+}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java
index ea42e143ae13a..d2c89d73aaf77 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/RntbdTransportClient.java
@@ -3,6 +3,7 @@
package com.azure.data.cosmos.internal.directconnectivity;
+import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.internal.Configs;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.UserAgentContainer;
@@ -23,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
import java.io.File;
import java.io.IOException;
@@ -122,14 +122,19 @@ public Mono invokeStoreAsync(final URI address, final RxDocumentS
logger.debug("RntbdTransportClient.invokeStoreAsync({}, {}): {}", address, request, record);
- return Mono.fromFuture(record).doFinally(signalType -> {
- logger.debug("SignalType.{} received from reactor: {\n endpoint: {},\n record: {}\n}",
- signalType.name(),
- endpoint,
- record);
- if (signalType == SignalType.CANCEL) {
- record.stage(RntbdRequestRecord.Stage.CANCELLED_BY_CLIENT);
+ return Mono.fromFuture(record.whenComplete((response, throwable) -> {
+
+ record.stage(RntbdRequestRecord.Stage.COMPLETED);
+
+ if (request.requestContext.cosmosResponseDiagnostics == null) {
+ request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
+ }
+
+ if(response != null) {
+ logger.debug("%s", record.takeTimelineSnapshot());
}
+ })).doOnCancel(() -> {
+ logger.debug("REQUEST CANCELLED: {}", record);
});
}
@@ -319,6 +324,48 @@ public String toString() {
// region Types
+ /**
+ * A builder for constructing {@link Options} instances.
+ *
+ *
Using system properties to set the default {@link Options} used by an {@link Builder}
+ *
+ * A default options instance is created when the {@link Builder} class is initialized. This instance specifies
+ * the default options used by every {@link Builder} instance. In priority order the default options instance
+ * is created from:
+ *
+ *
The JSON value of system property {@code azure.cosmos.directTcp.defaultOptions}.
+ *
JSON value errors are logged and then ignored. If none of the above values are available or all available
+ * values are in error, the default options instance is created from the private parameterless constructor for
+ * {@link Options}.
+ */
@SuppressWarnings("UnusedReturnValue")
public static class Builder {
@@ -329,16 +376,6 @@ public static class Builder {
static {
- // In priority order we take default Direct TCP options from:
- //
- // 1. the string value of system property "azure.cosmos.directTcp.options", or
- // 2. the contents of the file located by the system property "azure.cosmos.directTcp.optionsFile", or
- // 3. the contents of the resource file named "azure.cosmos.directTcp.options.json"
- //
- // Otherwise, if none of these values are set or an error occurs we create default options based on a
- // set of hard-wired values defined in the default private parameterless constructor for
- // RntbdTransportClient.Options.
-
Options options = null;
try {
@@ -373,7 +410,7 @@ public static class Builder {
final ClassLoader loader = RntbdTransportClient.class.getClassLoader();
final String name = DEFAULT_OPTIONS_PROPERTY_NAME + ".json";
- try (final InputStream stream = loader.getResourceAsStream(name)) {
+ try (InputStream stream = loader.getResourceAsStream(name)) {
if (stream != null) {
// Attempt to load default options from the JSON resource file "{propertyName}.json"
options = RntbdObjectMapper.readValue(stream, Options.class);
@@ -383,7 +420,14 @@ public static class Builder {
}
}
} finally {
- DEFAULT_OPTIONS = options != null ? options : new Options();
+ if (options == null) {
+ DEFAULT_OPTIONS = new Options();
+ } else {
+ logger.info("Updated default Direct TCP options from system property {}: {}",
+ DEFAULT_OPTIONS_PROPERTY_NAME,
+ options);
+ DEFAULT_OPTIONS = options;
+ }
}
}
@@ -553,7 +597,9 @@ public Builder userAgent(final UserAgentContainer value) {
static final class JsonSerializer extends StdSerializer {
- public JsonSerializer() {
+ private static final long serialVersionUID = 1007663695768825670L;
+
+ JsonSerializer() {
super(RntbdTransportClient.class);
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java
index bd5bcc38d3f7e..e56559ff3aed2 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHandler.java
@@ -44,7 +44,7 @@ public class RntbdClientChannelHandler extends ChannelInitializer imple
*/
@Override
public void channelAcquired(final Channel channel) {
- logger.trace("{} CHANNEL ACQUIRED", channel);
+ logger.debug("{} CHANNEL ACQUIRED", channel);
}
/**
@@ -56,7 +56,7 @@ public void channelAcquired(final Channel channel) {
*/
@Override
public void channelCreated(final Channel channel) {
- logger.trace("{} CHANNEL CREATED", channel);
+ logger.debug("{} CHANNEL CREATED", channel);
this.initChannel(channel);
}
@@ -69,7 +69,7 @@ public void channelCreated(final Channel channel) {
*/
@Override
public void channelReleased(final Channel channel) {
- logger.trace("{} CHANNEL RELEASED", channel);
+ logger.debug("{} CHANNEL RELEASED", channel);
}
/**
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java
index c1020f01988af..de354274a58a5 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java
@@ -4,10 +4,7 @@
package com.azure.data.cosmos.internal.directconnectivity.rntbd;
import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdEndpoint.Config;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.annotation.JsonProperty;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.util.concurrent.Future;
@@ -15,7 +12,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -24,7 +20,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
-@JsonSerialize(using = RntbdClientChannelHealthChecker.JsonSerializer.class)
public final class RntbdClientChannelHealthChecker implements ChannelHealthChecker {
// region Fields
@@ -43,22 +38,17 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck
// A channel will not be declared unhealthy if a write was attempted recently. As such gaps between
// Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite lower than this value are ignored.
- // Guidance: The grace period should be large enough to accommodate slow writes. For example, a value of 2s requires
- // that the client can sustain data rates of at least 1 MB/s when writing 2 MB documents.
+ // Guidance: The grace period should be large enough to accommodate slow writes. For example, a value of 2s
+ // requires that the client can sustain data rates of at least 1 MB/s when writing 2 MB documents.
private static final long writeHangGracePeriodInNanos = 2L * 1_000_000_000L;
- // A channel is considered idle if:
- // idleConnectionTimeout > 0L && System.nanoTime() - Timestamps.lastChannelRead() >= idleConnectionTimeout
+ @JsonProperty
private final long idleConnectionTimeoutInNanos;
- // A channel will be declared unhealthy if the gap between Timestamps.lastChannelWrite and Timestamps.lastChannelRead
- // grows beyond this value.
- // Constraint: readDelayLimit > readHangGracePeriod
+ @JsonProperty
private final long readDelayLimitInNanos;
- // A channel will be declared unhealthy if the gap between Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite
- // grows beyond this value.
- // Constraint: writeDelayLimit > writeHangGracePeriod
+ @JsonProperty
private final long writeDelayLimitInNanos;
// endregion
@@ -67,33 +57,72 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck
public RntbdClientChannelHealthChecker(final Config config) {
- checkNotNull(config, "config: null");
+ checkNotNull(config, "expected non-null config");
- this.idleConnectionTimeoutInNanos = config.idleConnectionTimeoutInNanos();
+ checkArgument(config.receiveHangDetectionTimeInNanos() > readHangGracePeriodInNanos,
+ "config.receiveHangDetectionTimeInNanos: %s",
+ config.receiveHangDetectionTimeInNanos());
- this.readDelayLimitInNanos = config.receiveHangDetectionTimeInNanos();
- checkArgument(this.readDelayLimitInNanos > readHangGracePeriodInNanos, "config.receiveHangDetectionTimeInNanos: %s", this.readDelayLimitInNanos);
+ checkArgument(config.sendHangDetectionTimeInNanos() > writeHangGracePeriodInNanos,
+ "config.sendHangDetectionTimeInNanos: %s",
+ config.sendHangDetectionTimeInNanos());
+ this.idleConnectionTimeoutInNanos = config.idleConnectionTimeoutInNanos();
+ this.readDelayLimitInNanos = config.receiveHangDetectionTimeInNanos();
this.writeDelayLimitInNanos = config.sendHangDetectionTimeInNanos();
- checkArgument(this.writeDelayLimitInNanos > writeHangGracePeriodInNanos, "config.sendHangDetectionTimeInNanos: %s", this.writeDelayLimitInNanos);
+
}
// endregion
// region Methods
+ /**
+ * Returns the idle connection timeout interval in nanoseconds.
+ *
+ * A channel is considered idle if {@link #idleConnectionTimeoutInNanos} is greater than zero and the time since
+ * the last channel read is greater than {@link #idleConnectionTimeoutInNanos}.
+ *
+ * @return Idle connection timeout interval in nanoseconds.
+ */
public long idleConnectionTimeoutInNanos() {
return this.idleConnectionTimeoutInNanos;
}
+ /**
+ * Returns the read delay limit in nanoseconds.
+ *
+ * A channel will be declared unhealthy if the gap between the last channel write and the last channel read grows
+ * beyond this value.
+ *
+ * Constraint: {@link #readDelayLimitInNanos} > {@link #readHangGracePeriodInNanos}
+ *
+ * @return Read delay limit in nanoseconds.
+ */
public long readDelayLimitInNanos() {
return this.readDelayLimitInNanos;
}
+ /**
+ * Returns the write delay limit in nanoseconds.
+ *
+ * A channel will be declared unhealthy if the gap between the last channel write attempt and the last channel write
+ * grows beyond this value.
+ *
- * If {@code false} this message should be passed to the next @{link ChannelOutboundHandler} in the pipeline.
+ * If {@code false} this message should be passed to the next {@link io.netty.channel.ChannelHandlerContext} in the
+ * pipeline.
*
- * @param message the message to encode
- * @return @{code true}, if the given message is an an @{link RntbdContextRequest} instance; otherwise @{false}
+ * @param message the message to encode.
+ *
+ * @return {@code true}, if the given message is an an @{link RntbdContextRequest} instance; {@code false}
+ * otherwise.
*/
@Override
public boolean acceptOutboundMessage(final Object message) {
- return message instanceof RntbdContextRequest;
+ return message.getClass() == RntbdContextRequest.class;
}
/**
- * Encode an @{link RntbdContextRequest} message into a {@link ByteBuf}
+ * Encode an {@link RntbdContextRequest} message into a {@link ByteBuf}.
*
* This method will be called for each written message that can be handled by this encoder.
*
- * @param context the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
- * @param message the message to encode
- * @param out the {@link ByteBuf} into which the encoded message will be written
- * @throws IllegalStateException is thrown if an error occurs
+ * @param context the {@link ChannelHandlerContext} to which this {@link MessageToByteEncoder} belongs.
+ * @param message the message to encode.
+ * @param out the {@link ByteBuf} into which the encoded message will be written.
+ *
+ * @throws IllegalStateException is thrown if an error occurs.
*/
@Override
- protected void encode(final ChannelHandlerContext context, final Object message, final ByteBuf out) throws IllegalStateException {
+ protected void encode(
+ final ChannelHandlerContext context,
+ final RntbdContextRequest message,
+ final ByteBuf out) throws IllegalStateException {
- final RntbdContextRequest request = (RntbdContextRequest)message;
out.markWriterIndex();
try {
- request.encode(out);
+ message.encode(out);
} catch (final IllegalStateException error) {
out.resetWriterIndex();
throw error;
}
- Logger.debug("{}: ENCODE COMPLETE: request={}", context.channel(), request);
+ Logger.debug("{}: ENCODE COMPLETE: message={}", context.channel(), message);
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java
index 090bfc46dfe07..cae62f18d4cbd 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdMetrics.java
@@ -6,23 +6,25 @@
import com.azure.data.cosmos.internal.directconnectivity.RntbdTransportClient;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.net.PercentEscaper;
import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
-import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.config.NamingConvention;
+import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.dropwizard.DropwizardConfig;
import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry;
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import io.micrometer.core.lang.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
@@ -30,32 +32,38 @@
@SuppressWarnings("UnstableApiUsage")
@JsonPropertyOrder({
"tags", "concurrentRequests", "requests", "responseErrors", "responseSuccesses", "completionRate", "responseRate",
- "channelsAcquired", "channelsAvailable", "requestQueueLength", "usedDirectMemory", "usedHeapMemory"
+ "requestSize", "responseSize", "channelsAcquired", "channelsAvailable", "requestQueueLength", "usedDirectMemory",
+ "usedHeapMemory"
})
public final class RntbdMetrics {
// region Fields
- private static final PercentEscaper escaper = new PercentEscaper("_-", false);
+ private static final PercentEscaper PERCENT_ESCAPER = new PercentEscaper("_-", false);
+
+ private static final Logger logger = LoggerFactory.getLogger(RntbdMetrics.class);
private static final CompositeMeterRegistry registry = new CompositeMeterRegistry();
- private static final String prefix = "azure.cosmos.directTcp.";
- private static MeterRegistry consoleLoggingRegistry;
+ static {
+ try {
+ int step = Integer.getInteger("azure.cosmos.monitoring.consoleLogging.step", 0);
+ if (step > 0) {
+ RntbdMetrics.add(RntbdMetrics.consoleLoggingRegistry(step));
+ }
+ } catch (Throwable error) {
+ logger.error("failed to initialize console logging registry due to ", error);
+ }
+ }
- private final RntbdTransportClient transportClient;
private final RntbdEndpoint endpoint;
+ private final DistributionSummary requestSize;
private final Timer requests;
private final Timer responseErrors;
+ private final DistributionSummary responseSize;
private final Timer responseSuccesses;
private final Tags tags;
-
- static {
- int step = Integer.getInteger("azure.cosmos.monitoring.consoleLogging.step", 0);
- if (step > 0) {
- RntbdMetrics.add(RntbdMetrics.consoleLoggingRegistry(step));
- }
- }
+ private final RntbdTransportClient transportClient;
// endregion
@@ -67,6 +75,7 @@ public RntbdMetrics(RntbdTransportClient client, RntbdEndpoint endpoint) {
this.endpoint = endpoint;
this.tags = Tags.of(client.tag(), endpoint.tag());
+
this.requests = registry.timer(nameOf("requests"), tags);
this.responseErrors = registry.timer(nameOf("responseErrors"), tags);
this.responseSuccesses = registry.timer(nameOf("responseSuccesses"), tags);
@@ -102,63 +111,36 @@ public RntbdMetrics(RntbdTransportClient client, RntbdEndpoint endpoint) {
.register(registry);
Gauge.builder(nameOf("usedDirectMemory"), endpoint, x -> x.usedDirectMemory())
- .description("Java direct memory usage")
+ .description("Java direct memory usage (MiB)")
.baseUnit("bytes")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("usedHeapMemory"), endpoint, x -> x.usedHeapMemory())
- .description("Java heap memory usage")
+ .description("Java heap memory usage (MiB)")
.baseUnit("MiB")
.tags(this.tags)
.register(registry);
+
+ this.requestSize = DistributionSummary.builder(nameOf("requestSize"))
+ .description("Request size (bytes)")
+ .baseUnit("bytes")
+ .tags(this.tags)
+ .register(registry);
+
+ this.responseSize = DistributionSummary.builder(nameOf("responseSize"))
+ .description("Response size (bytes)")
+ .baseUnit("bytes")
+ .tags(this.tags)
+ .register(registry);
}
// endregion
// region Accessors
- @JsonIgnore
- private static synchronized MeterRegistry consoleLoggingRegistry(final int step) {
-
- if (consoleLoggingRegistry == null) {
-
- MetricRegistry dropwizardRegistry = new MetricRegistry();
-
- ConsoleReporter consoleReporter = ConsoleReporter
- .forRegistry(dropwizardRegistry)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
-
- consoleReporter.start(step, TimeUnit.SECONDS);
-
- DropwizardConfig dropwizardConfig = new DropwizardConfig() {
-
- @Override
- public String get(@Nullable String key) {
- return null;
- }
-
- @Override
- public String prefix() {
- return "console";
- }
-
- };
-
- consoleLoggingRegistry = new DropwizardMeterRegistry(dropwizardConfig, dropwizardRegistry, HierarchicalNameMapper.DEFAULT, Clock.SYSTEM) {
- @Override
- @Nonnull
- protected Double nullGaugeValue() {
- return Double.NaN;
- }
- };
-
- consoleLoggingRegistry.config().namingConvention(NamingConvention.dot);
- }
-
- return consoleLoggingRegistry;
+ public static void add(MeterRegistry registry) {
+ RntbdMetrics.registry.add(registry);
}
@JsonProperty
@@ -172,13 +154,13 @@ public int channelsAvailable() {
}
/***
- * Computes the number of successful (non-error) responses received divided by the number of completed requests
+ * Computes the number of successful (non-error) responses received divided by the number of completed requests.
*
- * @return The number of successful (non-error) responses received divided by the number of completed requests
+ * @return number of successful (non-error) responses received divided by the number of completed requests.
*/
@JsonProperty
public double completionRate() {
- return this.responseSuccesses.count() / (double)this.requests.count();
+ return this.responseSuccesses.count() / (double) this.requests.count();
}
@JsonProperty
@@ -197,13 +179,18 @@ public int requestQueueLength() {
}
@JsonProperty
- public Iterable requests() {
- return this.requests.measure();
+ public HistogramSnapshot requestSize() {
+ return this.requestSize.takeSnapshot();
+ }
+
+ @JsonProperty
+ public HistogramSnapshot requests() {
+ return this.requests.takeSnapshot();
}
@JsonProperty
- public Iterable responseErrors() {
- return this.responseErrors.measure();
+ public HistogramSnapshot responseErrors() {
+ return this.responseErrors.takeSnapshot();
}
/***
@@ -213,12 +200,17 @@ public Iterable responseErrors() {
*/
@JsonProperty
public double responseRate() {
- return this.responseSuccesses.count() / (double)(this.requests.count() + this.endpoint.concurrentRequests());
+ return this.responseSuccesses.count() / (double) (this.requests.count() + this.endpoint.concurrentRequests());
}
@JsonProperty
- public Iterable responseSuccesses() {
- return this.responseSuccesses.measure();
+ public HistogramSnapshot responseSize() {
+ return this.responseSize.takeSnapshot();
+ }
+
+ @JsonProperty
+ public HistogramSnapshot responseSuccesses() {
+ return this.responseSuccesses.takeSnapshot();
}
@JsonProperty
@@ -240,16 +232,12 @@ public long usedHeapMemory() {
// region Methods
- public static void add(MeterRegistry registry) {
- RntbdMetrics.registry.add(registry);
- }
-
- public void markComplete(RntbdRequestRecord record) {
- record.stop(this.requests, record.isCompletedExceptionally() ? this.responseErrors : this.responseSuccesses);
- }
-
- public static String escape(String value) {
- return escaper.escape(value);
+ public void markComplete(RntbdRequestRecord requestRecord) {
+ requestRecord.stop(this.requests, requestRecord.isCompletedExceptionally()
+ ? this.responseErrors
+ : this.responseSuccesses);
+ this.requestSize.record(requestRecord.requestLength());
+ this.responseSize.record(requestRecord.responseLength());
}
@Override
@@ -261,8 +249,51 @@ public String toString() {
// region Private
+ static String escape(String value) {
+ return PERCENT_ESCAPER.escape(value);
+ }
+
+ private static MeterRegistry consoleLoggingRegistry(final int step) {
+
+ final MetricRegistry dropwizardRegistry = new MetricRegistry();
+
+ ConsoleReporter consoleReporter = ConsoleReporter
+ .forRegistry(dropwizardRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+
+ consoleReporter.start(step, TimeUnit.SECONDS);
+
+ DropwizardConfig dropwizardConfig = new DropwizardConfig() {
+
+ @Override
+ public String get(@Nullable String key) {
+ return null;
+ }
+
+ @Override
+ public String prefix() {
+ return "console";
+ }
+
+ };
+
+ final MeterRegistry consoleLoggingRegistry = new DropwizardMeterRegistry(
+ dropwizardConfig, dropwizardRegistry, HierarchicalNameMapper.DEFAULT, Clock.SYSTEM) {
+ @Override
+ @Nonnull
+ protected Double nullGaugeValue() {
+ return Double.NaN;
+ }
+ };
+
+ consoleLoggingRegistry.config().namingConvention(NamingConvention.dot);
+ return consoleLoggingRegistry;
+ }
+
private static String nameOf(final String member) {
- return prefix + member;
+ return "azure.cosmos.directTcp." + member;
}
// endregion
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestArgs.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestArgs.java
index 2efa46449ada9..940eed0f55a92 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestArgs.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestArgs.java
@@ -15,9 +15,9 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
-import java.math.BigDecimal;
import java.net.URI;
import java.time.Duration;
+import java.time.OffsetDateTime;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -27,7 +27,7 @@
import static io.micrometer.core.instrument.Timer.Sample;
@JsonPropertyOrder({
- "transportRequestId", "origin", "replicaPath", "activityId", "operationType", "resourceType", "creationTime",
+ "transportRequestId", "activityId", "origin", "replicaPath", "operationType", "resourceType", "timeCreated",
"lifetime"
})
public final class RntbdRequestArgs {
@@ -36,7 +36,8 @@ public final class RntbdRequestArgs {
private final Sample sample;
private final UUID activityId;
- private final long creationTime;
+ private final OffsetDateTime timeCreated;
+ private final long nanoTimeCreated;
private final Stopwatch lifetime;
private final String origin;
private final URI physicalAddress;
@@ -47,7 +48,8 @@ public final class RntbdRequestArgs {
public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest, final URI physicalAddress) {
this.sample = Timer.start();
this.activityId = UUID.fromString(serviceRequest.getActivityId());
- this.creationTime = System.nanoTime();
+ this.timeCreated = OffsetDateTime.now();
+ this.nanoTimeCreated = System.nanoTime();
this.lifetime = Stopwatch.createStarted();
this.origin = physicalAddress.getScheme() + "://" + physicalAddress.getAuthority();
this.physicalAddress = physicalAddress;
@@ -63,17 +65,17 @@ public UUID activityId() {
return this.activityId;
}
- @JsonProperty
- public long creationTime() {
- return this.creationTime;
- }
-
@JsonSerialize(using = ToStringSerializer.class)
@JsonProperty
public Duration lifetime() {
return this.lifetime.elapsed();
}
+ @JsonIgnore
+ public long nanoTimeCreated() {
+ return this.nanoTimeCreated;
+ }
+
@JsonProperty
public String origin() {
return this.origin;
@@ -94,6 +96,11 @@ public RxDocumentServiceRequest serviceRequest() {
return this.serviceRequest;
}
+ @JsonProperty
+ public OffsetDateTime timeCreated() {
+ return this.timeCreated;
+ }
+
@JsonProperty
public long transportRequestId() {
return this.transportRequestId;
@@ -114,18 +121,17 @@ public String toString() {
return RntbdObjectMapper.toString(this);
}
- public void traceOperation(final Logger logger, final ChannelHandlerContext context, final String operationName, final Object... args) {
+ public void traceOperation(
+ final Logger logger, final ChannelHandlerContext context, final String operationName, final Object... args) {
- checkNotNull(logger, "logger");
+ checkNotNull(logger, "expected non-null logger");
- if (logger.isTraceEnabled()) {
- final BigDecimal lifetime = BigDecimal.valueOf(this.lifetime.elapsed().toNanos(), 6);
- logger.trace("{},{},\"{}({})\",\"{}\",\"{}\"", this.creationTime, lifetime, operationName,
- Stream.of(args).map(arg ->
- arg == null ? "null" : arg.toString()).collect(Collectors.joining(",")
- ),
- this, context
- );
+ if (logger.isDebugEnabled()) {
+ logger.debug("{},{},\"{}({})\",\"{}\",\"{}\"", this.timeCreated, this.lifetime.elapsed(), operationName,
+ Stream.of(args)
+ .map(arg -> arg == null ? "null" : arg.toString())
+ .collect(Collectors.joining(",")),
+ this, context);
}
}
diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java
index 3192bd80c62c7..46cb85f0fb328 100644
--- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java
+++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/directconnectivity/rntbd/RntbdRequestDecoder.java
@@ -11,7 +11,7 @@
public final class RntbdRequestDecoder extends ByteToMessageDecoder {
/**
- * Prepare for decoding an @{link RntbdRequest} or fire a channel readTree event to pass the input message along
+ * Prepare for decoding an @{link RntbdRequest} or fire a channel readTree event to pass the input message along.
*
* @param context the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param message the message to be decoded
@@ -35,18 +35,22 @@ public void channelRead(final ChannelHandlerContext context, final Object messag
}
/**
- * Decode the input {@link ByteBuf} to an RntbdRequest instance
+ * Decode the input {@link ByteBuf} to an {@link RntbdRequest} instance.
*
* This method will be called till either the input {@link ByteBuf} has nothing to readTree after return from this
* method or till nothing was readTree from the input {@link ByteBuf}.
*
- * @param context the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
- * @param in the {@link ByteBuf} from which to readTree data
- * @param out the {@link List} to which decoded messages should be added
+ * @param context the {@link ChannelHandlerContext} to which this {@link ByteToMessageDecoder} belongs.
+ * @param in the {@link ByteBuf} from which to read data.
+ * @param out the {@link List} to which decoded messages should be added.
+ *
* @throws IllegalStateException thrown if an error occurs
*/
@Override
- protected void decode(final ChannelHandlerContext context, final ByteBuf in, final List