Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Mode: Adding support for OTLP HTTP Exporter - First draft #6171

Closed

Conversation

asafm
Copy link
Contributor

@asafm asafm commented Jan 23, 2024

This PR aims to showcase the idea of using pooled mutable Marshaler objects instead of immutable ones to add support to Memory Mode.

I created two classes for each Marshaler: a mutable and an immutable version. Both inherit from a base class containing the shared logic of serializing and computing the size.

This was a definitive rabbit hole. I started with MetricsRequestMarshaller and ended up much deeper than I wanted :)
So I simply stopped at some point, marking with TODOs where to continue, and left many code sections uncompiled.

The main goal is to see if this pattern makes sense.

Notable things:

  • I wanted to eliminate byte[], so I switched it all to String since it already exists in the original MetricData input so that we can avoid memory allocation. I needed to convert the String to a UTF-8 byte array without memory allocation. I copied the Utf8 class from the protobuf library and trimmed it, only to have a conversion method from String to ByteBuffer containing the utf8 bytes. I reuse that ByteBuffer using thread-local. You don't need more than one.
  • I switched from primitive array to List everywhere since I needed the same data structure between the immutable and mutable. I'm using a new DynamicList in the mutable version, which is like the DynamicPrimitiveLongList. The idea is that whenever I need more room, I add a small ten-item array instead of duplicating it, which can sometimes be wasteful in space.

In short, the code is a rough draft, and attention must still be paid to the scope of methods and many other things - but in the future.

@asafm
Copy link
Contributor Author

asafm commented Jan 23, 2024

@jack-berg Here it is

@asafm
Copy link
Contributor Author

asafm commented Jan 31, 2024

@jack-berg Can you have a look at this draft to see if the idea is ok?

@jack-berg
Copy link
Member

There's a lot to review here.

I think we can break this apart to separate the establishment of the key patterns from the repetition of applying that pattern to all the different types in the MetricData hiearchy.

I think the place to do that is to stop going down the rabbit whole after MetricMarshaler. Up in OtlpHttpMetricExporter you have an if statement which forks into MutableMetricRequestMarshaler or ImmutableMetricsRequestMarshaler. You can implement MutableMetricRequestMarshaler in such a way that its still immutable in virtually all of its implementation except for 1 or 2 key spots to demonstrate the pattern of mutability. After we're comfortable with the pattern, we can incrementally work through the rest of the type hierarchy to translate all to the mutable alternative. (I'm happy to help with this to reduce the burden.)

I'm going to hold off on looking to closely its more manageable, but some high level comments:

  • I don't see any of the entries being returned to the object pool. I think the natural way to do this is to add something like delegate.export(exportRequest, metrics.size()).whenComplete(() -> exportRequest.returnObjects()) to OtlpHttpMetricExporter#export
  • One thing I worry about with this object pooling approach is a high overall memory usage. Suppose you have one metric with high cardinality, and a large number of other metrics with a single point. With object pooling, each entry in the pool will tend towards having a DynamicList allocated with enough entries to accommodate the high cardinality metrics. And you would end up with low allocation rate but higher than needed overall memory allocated. WDYT?

@laurit I know you had done some experimentation with this type of thing. I wonder if you have taken a look and have any thoughts?

@asafm
Copy link
Contributor Author

asafm commented Feb 5, 2024

@jack-berg

I think the place to do that is to stop going down the rabbit whole after MetricMarshaler. Up in OtlpHttpMetricExporter you have an if statement which forks into MutableMetricRequestMarshaler or ImmutableMetricsRequestMarshaler. You can implement MutableMetricRequestMarshaler in such a way that its still immutable in virtually all of its implementation except for 1 or 2 key spots to demonstrate the pattern of mutability. After we're comfortable with the pattern, we can incrementally work through the rest of the type hierarchy to translate all to the mutable alternative. (I'm happy to help with this to reduce the burden.)

The idea in this PR is to review it top to bottom in IntelliJ just to get the hang of the pattern and see if it makes sense so I can know if I can proceed to do it for all.

I can do the work of downsizing it, but it's a lot of work :) If it's absolutely hard and too difficult to view in IntelliJ, I will do it.

I don't see any of the entries being returned to the object pool. I think the natural way to do this is to add something like delegate.export(exportRequest, metrics.size()).whenComplete(() -> exportRequest.returnObjects()) to OtlpHttpMetricExporter#export

I didn't add it yet since I wanted confirmation on the pattern and proposal of immutable/mutable concept first. My plan was to add it, which will be one in a recursive manner.

One thing I worry about with this object pooling approach is a high overall memory usage. Suppose you have one metric with high cardinality, and a large number of other metrics with a single point. With object pooling, each entry in the pool will tend towards having a DynamicList allocated with enough entries to accommodate the high cardinality metrics. And you would end up with low allocation rate but higher than needed overall memory allocated. WDYT?

I have thought about it and implemented part of the solution in this PR.
So my solution for this is pooling the DynamicLists, and when you retrieve one from the pool, they will be sorted by capacity and you will get the closest one from the above. When you need to built a list of NumberDataPointMarshaller - which is where the high cardinality comes into play - you will most probably retrieve the list you have used in the previous collection, since it will be the biggest one. For the list of KeyValueMarshaller of Examplars which are presume won't be big by nature, we can either work with this approach as well, or have the list be reused in place. The latter is not very efficient, so I think it would be best to work with the first approach everytime we need a dynamic list.
When lists are not used for x number of collections / y minutes, we can get remove them from the pool.

@laurit
Copy link
Contributor

laurit commented Feb 8, 2024

Last year I experimented a bit reducing allocations for OTLP trace exporter. I took a slightly different approach. Instead of pooling the marshaler objects I removed them all together. Marshaling is essentially a 2 step process: firstly compute the size of the data and then serialize it. The need to know the size of each element (and elements can have child elements) before the element can be written to output stream makes it a bit tricky to do the marshaling without extra allocations. With the current immutable marshalers size is computed in the constructor and serialization is done in writeTo method.
In my experiment I create a MarshalerContext object that serves as the temporary storage needed for the marshaling. This object contains an int array, an Object array and indexes for both arrays, both of these arrays are grown when needed. This object can be reset and reused when the export is run again, once the internal arrays have grown to their optimal size there will be no more allocations. The int array is used to hold element sizes. When calculating size firstly reserve an offset in the size array, compute size of child elements add headers etc and finally set the size in the array. This step is repeated for each element. At the end of the size calculation the array will contain sizes for each element in the order the elements were visited. For example the first element in the array will contain the size of the root element which is the size of the payload. Writing the data isn't much different from how it was done with the marshaler objects. When previously size was read from the marshaler now serializing reads the size from the array and increments the index. Calculating the size and serializing must travers the objects exactly the same way for this to work correctly. The object array can be used to keep results for computations done in the size calculation phase so they wouldn't need to be done again in the serialization phase. For example if String is marshaled to byte[] as it is currently then the conversion would be done in the size calculation phase (because it needs to know the size of the marshaled string) and the resulting byte[] would be pushed into the Object array so that serialization phase wouldn't need to redo the conversion. I hope this made sense.
Performance wise, if I remember correctly, the jmh results were comparable. There were some small changes that helped to reduce allocations but have a slight negative impact on the jmh results. I would have hoped that eliminating allocations also noticeably improves the run time but that was not so. Guess jvm really is good at allocation and freeing object. For the object pooling approach I suspect being competitive (or not loosing too badly) in the jmh benchmarks may become a challenge.
I also implemented allocation free marshaling for Strings. I did not use a temporary buffer but rather computed the utf8 size with looping over the string with charAt and in the serialization phase repeated the same loop but now wrote the actual bytes instead of just computing the size. While this did not require additional allocations it was slower than using getBytes(StandardCharsets.UTF_8) on the string. Can't really compete with the intrinsics. It might be that if the test data actually contained multi byte chars (afaik the intrinsics assume that there are none and bail out when there are) the result would be better. Probably we could improve this by employing unsafe, if it is a latin1 string and all has 7bit chars then could just copy the underlying byte array to output.

@asafm
Copy link
Contributor Author

asafm commented Feb 12, 2024

@laurit Your design was the first on my list of possibilities to solve this. I was hesistant to go with it since I was afraid it would be too complicated to maintain, but after drawing out some pseudo code, maybe it's not so bad, and it has a significant advantage in that memory cost is a lot less.

If I summarize your approach: Basically, build a tree of objects that only contain the size., Then serialize using those pre-computed sizes.

If I put it in pseudo-code, it looks something like this:

interface MessageSize {
        long getFieldSize(int fieldPosition);
	long getSize();
}

class DefaultMessageSize implements MessgeSize {
	DynamicList<MessageSize> fieldsSize;
	long size;
}

The "node" in the tree is MessageSize. A leaf is a MessageSize without elements at fieldsSize.

Each type of message in the protobuf, which today has a *Marshaller class, would get two additional static methods:

  • MessageSize messageSize(...) - this will return the MessageSize for that message, which effectively means the size of the encoded protobuf message and the size of each field in that message contained in the fieldsSize list.
  • void encode(Serializer output, MessageSize messageSize, ...) - this will encode the message, utilizing the size of each field located in the messageSize.

The main idea is that messageSize() will add the message fields size in a certain order, and the encode() will encode them in exactly that order.

For example, if we take the protobuf message message ResourceMetrics, which is defined as:

message ResourceMetrics {
  reserved 1000;

  // The resource for the metrics in this message.
  // If this field is not set then no resource info is known.
  opentelemetry.proto.resource.v1.Resource resource = 1;

  // A list of metrics that originate from a resource.
  repeated ScopeMetrics scope_metrics = 2;

  // The Schema URL, if known. This is the identifier of the Schema that the resource data
  // is recorded in. To learn more about Schema URL see
  // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
  // This schema_url applies to the data in the "resource" field. It does not apply
  // to the data in the "scope_metrics" field which have their own schema_url field.
  string schema_url = 3;
}

then in ResourceMetricsMarshaler class, we will add the following:

static MessageSize messageSize(
   MarshallingObjectsPool marshallingObjectsPool
   Resource resource,
   Map<InstrumentationScope instrumentationScope, List<MetricData>> scopeToMetricData) {

  MessageSize messageSize = marshallingObjectsPool.getMessageSizePool().borrowObject();
  int expectedListSize = 
     1  // Resource
     + scopeToMetricData.keySet().size() // repeated scopeMetrics
     + 1 // schemaUrl
  DynamicList<MessageSize> fieldSizes = marshallingObjectsPool.getDynamicListPool().borrowList(expectedListSize);
  
  fieldSizes.add(ResourceMarshaler.messageSize(marshallingObjectsPool, resource);
  scopeToMetricData.forEach( (instrumentationScope, scopeMetricDataList) -> {
    fieldSizes.add(InstrumentationScopeMetricsMarshaler.messageSize(
      marshallingObjectsPool
      instrumentationScope,
      scopeMetricDataList
    ));
  });
  fieldSizes.add(MessgeSize.of(MarshalerUtil.sizeString((ResourceMetrics.SCHEMA_URL, schemaUrl), marshallingObjectsPool);
  
  messageSize.set(fieldSizes);
  return messageSize;
}

static encode(
  Serializer output,
  MessageSize resourceMetricsMessageSize,
  Resource resource,
  Map<InstrumentationScope instrumentationScope, List<MetricData>> scopeToMetricData) {

  int i = 0;
  ResourceMarshaler.encode(output, resourceMetricsMessageSize.getFieldSize(i++), resource);
  scopeToMetricData.forEach( (instrumentationScope, scopeMetricDataList) -> {
    InstrumentationScopeMetricsMarshaler.encode(
    output, 
    resourceMetricsMessageSize.getFieldSize(i++),
    instrumentationScope,
    scopeMetricDataList);
  });
  MarshalerUtil.encodeString(
    ResourceMetrics.SCHEMA_URL, 
    schemaUrl, 
    resourceMetricsMessageSize.getFieldSize(i++));
}

I'm ok with going with this approach.
@jack-berg WDYT?

@asafm
Copy link
Contributor Author

asafm commented Feb 20, 2024

@jack-berg WDYT?

@jack-berg
Copy link
Member

I think the approach is promising. Can we see what it looks like for a small portion of the encoding, like resource? Refactoring resource to use this approach should allow us to evaluate the patterns without all the work associated with refactoring the whole metric hierarchy.

@asafm
Copy link
Contributor Author

asafm commented Mar 6, 2024

@jack-berg @laurit Finally found the time and finished the prototype code for the above design. The PR is at: #6273

I've started at the KeyValue message and everything below it. I think it gives a good idea of how the code would look.

@jack-berg
Copy link
Member

This PR has been superseded. See 1.38.0 release notes for details.

@jack-berg jack-berg closed this May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants