Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos 3.X | Address memory leak in Direct TCP transport client #9211

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.cosmos.internal;

import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdObjectMapper;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.google.common.collect.ImmutableList;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Iterator;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Represents the startTime and duration of important events in the lifetime of a request.
* <p>
* A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, startTime, and
* duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
* request timelines for:
* <p><ul>
* <li>{@link com.azure.cosmos.implementation.http.HttpClient#send},
* <li>{@link com.azure.cosmos.implementation.directconnectivity.HttpTransportClient#invokeStoreAsync}, and
* <li>{@link com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient#invokeStoreAsync}.
* </ul></p>
* A {@link RequestTimeline} serializes to JSON as an array of {@link Event} instances. This is the default
* serialization for any class that implements {@link Iterable}.
* <p>
* <b>Example:</b>
* <pre>{@code OffsetDateTime startTime = OffsetDateTime.parse("2020-01-07T11:24:12.842749-08:00", DateTimeFormatter.ISO_OFFSET_DATE_TIME);
* sys.out.println(RequestTimeline.of(
* new RequestTimeline.Event("foo", startTime, startTime.plusSeconds(1)),
* new RequestTimeline.Event("bar", startTime.plusSeconds(1), startTime.plusSeconds(2))));}</pre>
* JSON serialization:
* <pre>{@code [{"name":"foo","startTime":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","startTime":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}</pre>
*/
public final class RequestTimeline implements Iterable<RequestTimeline.Event> {

private static final RequestTimeline EMPTY = new RequestTimeline();
private final ImmutableList<Event> events;

private RequestTimeline() {
this.events = ImmutableList.of();
}

private RequestTimeline(final ImmutableList<Event> events) {
checkNotNull(events, "expected non-null events");
this.events = events;
}

/**
* Returns an empty {@link RequestTimeline}.
*
* The empty startTime line returned is static.
*
* @return an empty {@link RequestTimeline}.
*/
public static RequestTimeline empty() {
return EMPTY;
}

/**
* Returns an iterator for enumerating the {@link Event} instances in this {@link RequestTimeline}.
*
* @return an iterator for enumerating the {@link Event} instances in this {@link RequestTimeline}.
*/
@Override
public Iterator<Event> iterator() {
return this.events.iterator();
}

/**
* Returns an empty {@link RequestTimeline}.
*
* The empty startTime line returned is static and equivalent to calling {@link RequestTimeline#empty}.
*
* @return an empty request timeline.
*/
public static RequestTimeline of() {
return EMPTY;
}

/**
* Returns a new {@link RequestTimeline} with a single event.
*
* @return a new {@link RequestTimeline} with a single event.
*/
public static RequestTimeline of(final Event event) {
return new RequestTimeline(ImmutableList.of(event));
}

/**
* Returns a new {@link RequestTimeline} with a pair of events.
*
* @return a new {@link RequestTimeline} with a pair of events.
*/
public static RequestTimeline of(final Event e1, final Event e2) {
return new RequestTimeline(ImmutableList.of(e1, e2));
}

/**
* Returns a new {@link RequestTimeline} with three events.
*
* @return a new {@link RequestTimeline} with three events.
*/
public static RequestTimeline of(final Event e1, final Event e2, final Event e3) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3));
}

/**
* Returns a new {@link RequestTimeline} with four events.
*
* @return a new {@link RequestTimeline} with four events.
*/
public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4));
}

/**
* Returns a new {@link RequestTimeline} with five events.
*
* @return a new {@link RequestTimeline} with five events.
*/
public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4, final Event e5) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4, e5));
}

/**
* Returns a new {@link RequestTimeline} with an arbitrary number of events.
*
* @return a new {@link RequestTimeline} with an arbitrary number of events.
*/
public static RequestTimeline of(final Event... events) {
return new RequestTimeline(ImmutableList.copyOf(events));
}

/**
* Returns a textual representation of this {@link RequestTimeline}.
* <p>
* The textual representation returned is a string of the form {@code RequestTimeline(}<i> &lt;event-array&gt;</i>
* {@code )}.
*/
@Override
public String toString() {
return RntbdObjectMapper.toString(this);
}

@JsonPropertyOrder({ "name", "startTime", "durationInMicroSec" })
public static final class Event {

@JsonIgnore
private final Duration duration;

@JsonSerialize(using = ToStringSerializer.class)
private final long durationInMicroSec;

@JsonProperty("eventName")
private final String name;

@JsonSerialize(using = ToStringSerializer.class)
private final OffsetDateTime startTime;

public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {

checkNotNull(name, "expected non-null name");

this.name = name;
this.startTime = from;

this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
if(this.duration != null) {
this.durationInMicroSec = duration.toNanos()/1000L;
} else {
this.durationInMicroSec = 0;
}
}

public Duration getDuration() {
return this.duration;
}

public String getName() {
return name;
}

public OffsetDateTime getStartTime() {
return startTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.data.cosmos.internal.directconnectivity;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.internal.Configs;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.UserAgentContainer;
Expand All @@ -23,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -122,14 +122,19 @@ public Mono<StoreResponse> invokeStoreAsync(final URI address, final RxDocumentS

logger.debug("RntbdTransportClient.invokeStoreAsync({}, {}): {}", address, request, record);

return Mono.fromFuture(record).doFinally(signalType -> {
logger.debug("SignalType.{} received from reactor: {\n endpoint: {},\n record: {}\n}",
signalType.name(),
endpoint,
record);
if (signalType == SignalType.CANCEL) {
record.stage(RntbdRequestRecord.Stage.CANCELLED_BY_CLIENT);
return Mono.fromFuture(record.whenComplete((response, throwable) -> {

record.stage(RntbdRequestRecord.Stage.COMPLETED);

if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}

if(response != null) {
logger.debug("%s", record.takeTimelineSnapshot());
}
})).doOnCancel(() -> {
logger.debug("REQUEST CANCELLED: {}", record);
});
}

Expand Down Expand Up @@ -319,6 +324,48 @@ public String toString() {

// region Types

/**
* A builder for constructing {@link Options} instances.
*
* <h3>Using system properties to set the default {@link Options} used by an {@link Builder}</h3>
* <p>
* A default options instance is created when the {@link Builder} class is initialized. This instance specifies
* the default options used by every {@link Builder} instance. In priority order the default options instance
* is created from:
* <ol>
* <li>The JSON value of system property {@code azure.cosmos.directTcp.defaultOptions}.
* <p>Example:
* <pre>{@code -Dazure.cosmos.directTcp.defaultOptions={\"maxChannelsPerEndpoint\":5,\"maxRequestsPerChannel\":30}}</pre>
* </li>
* <li>The contents of the JSON file located by system property {@code azure.cosmos.directTcp
* .defaultOptionsFile}.
* <p>Example:
* <pre>{@code -Dazure.cosmos.directTcp.defaultOptionsFile=/path/to/default/options/file}</pre>
* </li>
* <li>The contents of JSON resource file {@code azure.cosmos.directTcp.defaultOptions.json}.
* <p>Specifically, the resource file is read from this stream:
* <pre>{@code RntbdTransportClient.class.getClassLoader().getResourceAsStream("azure.cosmos.directTcp.defaultOptions.json")}</pre>
* <p>Example: <pre>{@code {
* "bufferPageSize": 8192,
* "connectionTimeout": "PT1M",
* "idleChannelTimeout": "PT0S",
* "idleEndpointTimeout": "PT1M10S",
* "maxBufferCapacity": 8388608,
* "maxChannelsPerEndpoint": 10,
* "maxRequestsPerChannel": 30,
* "receiveHangDetectionTime": "PT1M5S",
* "requestExpiryInterval": "PT5S",
* "requestTimeout": "PT1M",
* "requestTimerResolution": "PT0.5S",
* "sendHangDetectionTime": "PT10S",
* "shutdownTimeout": "PT15S"
* }}</pre>
* </li>
* </ol>
* <p>JSON value errors are logged and then ignored. If none of the above values are available or all available
* values are in error, the default options instance is created from the private parameterless constructor for
* {@link Options}.
*/
@SuppressWarnings("UnusedReturnValue")
public static class Builder {

Expand All @@ -329,16 +376,6 @@ public static class Builder {

static {

// In priority order we take default Direct TCP options from:
//
// 1. the string value of system property "azure.cosmos.directTcp.options", or
// 2. the contents of the file located by the system property "azure.cosmos.directTcp.optionsFile", or
// 3. the contents of the resource file named "azure.cosmos.directTcp.options.json"
//
// Otherwise, if none of these values are set or an error occurs we create default options based on a
// set of hard-wired values defined in the default private parameterless constructor for
// RntbdTransportClient.Options.

Options options = null;

try {
Expand Down Expand Up @@ -373,7 +410,7 @@ public static class Builder {
final ClassLoader loader = RntbdTransportClient.class.getClassLoader();
final String name = DEFAULT_OPTIONS_PROPERTY_NAME + ".json";

try (final InputStream stream = loader.getResourceAsStream(name)) {
try (InputStream stream = loader.getResourceAsStream(name)) {
if (stream != null) {
// Attempt to load default options from the JSON resource file "{propertyName}.json"
options = RntbdObjectMapper.readValue(stream, Options.class);
Expand All @@ -383,7 +420,14 @@ public static class Builder {
}
}
} finally {
DEFAULT_OPTIONS = options != null ? options : new Options();
if (options == null) {
DEFAULT_OPTIONS = new Options();
} else {
logger.info("Updated default Direct TCP options from system property {}: {}",
DEFAULT_OPTIONS_PROPERTY_NAME,
options);
DEFAULT_OPTIONS = options;
}
}
}

Expand Down Expand Up @@ -553,7 +597,9 @@ public Builder userAgent(final UserAgentContainer value) {

static final class JsonSerializer extends StdSerializer<RntbdTransportClient> {

public JsonSerializer() {
private static final long serialVersionUID = 1007663695768825670L;

JsonSerializer() {
super(RntbdTransportClient.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple
*/
@Override
public void channelAcquired(final Channel channel) {
logger.trace("{} CHANNEL ACQUIRED", channel);
logger.debug("{} CHANNEL ACQUIRED", channel);
}

/**
Expand All @@ -56,7 +56,7 @@ public void channelAcquired(final Channel channel) {
*/
@Override
public void channelCreated(final Channel channel) {
logger.trace("{} CHANNEL CREATED", channel);
logger.debug("{} CHANNEL CREATED", channel);
this.initChannel(channel);
}

Expand All @@ -69,7 +69,7 @@ public void channelCreated(final Channel channel) {
*/
@Override
public void channelReleased(final Channel channel) {
logger.trace("{} CHANNEL RELEASED", channel);
logger.debug("{} CHANNEL RELEASED", channel);
}

/**
Expand Down
Loading