Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new component: deltatocumulative processor #30479

Closed
sh0rez opened this issue Jan 12, 2024 · 33 comments
Closed

new component: deltatocumulative processor #30479

sh0rez opened this issue Jan 12, 2024 · 33 comments
Assignees
Labels
Accepted Component New component has been sponsored Stale

Comments

@sh0rez
Copy link
Contributor

sh0rez commented Jan 12, 2024

Important

This component is in active development. See progress in #30705


The purpose and use-cases of the new component

In similar fashion to the existing cumulativetodeltaprocessor, this component aggregates delta samples to their respective cumulative counterpart

An extensive design doc has been written to explore this idea.

Example configuration for the component

interval: 60s
stale: 5m

Telemetry data types supported

  • metrics (sum, histogram, exp. histogram)

Code Owner(s)

@gouthamve @sh0rez

Sponsor (optional)

@jpkrohling

Additional context

Prior discussion:

@sh0rez sh0rez added needs triage New item requiring triage Sponsor Needed New component seeking sponsor labels Jan 12, 2024
@jpkrohling jpkrohling added Accepted Component New component has been sponsored and removed Sponsor Needed New component seeking sponsor needs triage New item requiring triage labels Jan 15, 2024
@jpkrohling
Copy link
Member

I'm sponsoring this, as it's important for our Prometheus story.

@djaglowski
Copy link
Member

djaglowski commented Jan 22, 2024

This is a duplicate of #29300, which I've also offered to sponsor. I'm glad to see there are others interested in this as well. The main thing I think we need to do in order to combine these proposals is decide whether or not the component will be split into two parts, as described here. I would appreciate your thoughts on this @sh0rez and @jpkrohling.

@RichieSams, looks like there is already an implementation started so I would recommend pausing until we work out which approach we are going to use.

Also cc: @0x006EA1E5 who proposed the split design.

@sh0rez
Copy link
Contributor Author

sh0rez commented Jan 23, 2024

@djaglowski looking at the comment you referenced, it appears the proposal is the following:

  1. deltatocumulative: converts a stream of delta samples [1,2,3] to [1,3,6]
  2. time-aggregate: converts a stream of any samples [1,3,6] to [6] or similar, depending on aggregation

is that correct?

If so, that aligns perfectly with our work here, because #30705 implements exactly the behavior described in (1).

/cc @RichieSams @0x006EA1E5 @gouthamve

@djaglowski
Copy link
Member

That's great @sh0rez, thanks for clarifying. I'm excited to see this moving forward.

Based on this already having a more detailed proposal and implementation which is quite far along, I'll close #29300 and try to help #29461 in parallel.

@RichieSams
Copy link
Contributor

@sh0rez Would you like any assistance in the implementation of #30705 ?

If not, I can put my effort towards #29461

@sh0rez
Copy link
Contributor Author

sh0rez commented Jan 23, 2024

@RichieSams I think I'm covered in terms of code writing, but a review of #30706 and #30707 would be incredibly helpful!

Let me know if I can assist in #29461 in any way

@0x006EA1E5
Copy link

Hi, sorry just catching up with this. I'm very happy this is moving forward, and happy to help wherever I can...

@0x006EA1E5
Copy link

I've been trying to think of use-cases, edge cases etc.
Scenarios I can think of so far:

  1. The count connector example, where the count connector is producing a series of deltas, with no start_time_unix_nano
    • Producer is in same collector instance as this processor
    • We know that there is only a single producer, and that datapoints will be in-order.
  2. "Stateless delta producers". I'm thinking here of IoT, Web Clients, FaaS, that are all just pinging off deltas to an OTLP endpoint. We don't care about the producer instance ID or even the timestamps, we are just interested in aggregating a global count of increments.
  3. "Single well-behaved OTLP source". This is the case where we get a stream of delta data points from a "single-writer", and the data points contain each start_time_unix_nano
    • Producer is external to the collector instance in this example case, so could have gaps in the data due to network issues etc, right?
    • Can we assume datapoints are in-order?
  4. "Poorly-behaved OTLP source". This is the case where we get a stream of delta data points, but could be missing required attributes, may be out of order etc

I'm wondering, as long this processor can handle case (1) above (well formatted delta datapoints missing a start_time_unix_nano), are we able to fix up any other problems with the data with preceding "sanitisation" processors? Especially, are we able to set/clear start_time_unix_nano and time_unix_nano for the data_points in the DataPoint Context?

Put another way, what is the simplest possible implementation of this processor, assuming upstream "sanitisation"?

@shorez
Copy link

shorez commented Jan 24, 2024 via email

@0x006EA1E5
Copy link

I think it makes sense for the configuration for this processor and #29461 to align with the cumulativetodeltaprocessor, specifically include and exclude.

All three processors would also need to have a similar "MetricIdentity" feature.

Would it make sense to factor out this common code?

There is also something somewhat similar in the prometheusexporter

As I understand it, we can use the metrics spec as a reference as to what is an "identifying" property when building the "Metric Identity" used to track a metric

@RichieSams
Copy link
Contributor

RichieSams commented Jan 24, 2024

I think it makes sense for the configuration for this processor and #29461 to align with the cumulativetodeltaprocessor, specifically include and exclude.

All three processors would also need to have a similar "MetricIdentity" feature.

Would it make sense to factor out this common code?

There is also something somewhat similar in the prometheusexporter

As I understand it, we can use the metrics spec as a reference as to what is an "identifying" property when building the "Metric Identity" used to track a metric

I agree we should probably look at creating a shared set of Identity structs. I personally quite like the design @sh0rez came up within his PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707/files#diff-1be26496a452fcb59494ba5ee5ec2d00ad6c9a34c78d1360ace4d5f055ac6cdaR10

Using hashes of the metric attributes, etc.

@sh0rez
Copy link
Contributor Author

sh0rez commented Jan 24, 2024

@0x006EA1E5 I think it makes sense for the configuration for this processor and #29461 to align with the cumulativetodeltaprocessor, specifically include and exclude.

I was thinking about this during the implementation of this component and preliminarily decided against it.
I don't think it's common to have an exporter receive some metrics as delta and others as cumulative (happy to be proven wrong).
Even if that was required, the filterprocessor can be used to build such a pipeline. The filterprocessor is very sophisticated and we would essentially be implementing a "lite-version" here, that will likely always be worse.

@sh0rez
Copy link
Contributor Author

sh0rez commented Jan 24, 2024

@0x006EA1E5 As I understand it, we can use the metrics spec as a reference as to what is an "identifying" property when building the "Metric Identity" used to track a metric

Exactly! I wrote a bit about this in the design doc: https://docs.google.com/document/d/1Oqwl5rDLqB6-Qgd6Hy1PXYZBAH4pkcdudxNA7bRkrIc/edit#heading=h.a00fffk0v68v


@RichieSams I agree we should probably look at creating a shared set of Identity structs. I personally quite like the design @sh0rez came up within his PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707/files#diff-1be26496a452fcb59494ba5ee5ec2d00ad6c9a34c78d1360ace4d5f055ac6cdaR10

yeah I felt like this was generally useful while writing this, happy to factor this out to a common place! use-cases I can think of right now are deltatocumulativeprocessor, cumulativetodeltaprocessor, intervalprocessor. The prometheusexporter currently does something like this as well for aggregating deltas, but it may as well stop doing this entirely once deltatocumulative is stable

@0x006EA1E5
Copy link

I agree we should probably look at creating a shared set of Identity structs. I personally quite like the design @sh0rez came up within his PR: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30707/files#diff-1be26496a452fcb59494ba5ee5ec2d00ad6c9a34c78d1360ace4d5f055ac6cdaR10

Using hashes of the metric attributes, etc.

When I first looked at this, my first thought was to use some kind of non-crypto hash too, but then I looked at cumulativetodeltaprocessor and prometheusexporter and saw they both seemed to just be doing some kind of simple string concat.

It made me wonder, is the space saving worth the hashing cost (which has to be performed on every single input), vs the cost of using a string builder? How big do you make the hash to avoid collisions, etc?

I don't know the answer to that, my feeling would be that any gain may be marginal at best, so for my POC implementation I just went with concat + separator (basically a copy / paste from the prometheus exporter).

As I implied above, once I started to analyse this in more detail, I noticed that this is something the cumulativetodeltaprocessor already does (as well as the prometheusexporter). So I guess it is something that is likely to be needed in general? And if so it makes sense to implement it once, at the least so the behaviour is consistent.

In general, my heuristic approach would be, if in doubt, keep it simple, and optimise later when you know there is a concrete need.

That's not to say I'm against using a hash, just that I wasn't personally confident it was worth the cost of change etc, compared to simply factoring out and reusing the current cumulativetodeltaprocessor's implementation.

Indeed, if the consensus is that a hash is a better fit here, I'm keen to hear it! 😄

@RichieSams
Copy link
Contributor

It made me wonder, is the space saving worth the hashing cost (which has to be performed on every single input), vs the cost of using a string builder? How big do you make the hash to avoid collisions, etc?

But they're feeding the string directly into golang's hash function (because they're using the string as the key lookup in a map). So it will be basically identical. We're just skipping the step of having to having the big byte array in memory / use a pool of buffers

@0x006EA1E5
Copy link

But they're feeding the string directly into golang's hash function (because they're using the string as the key lookup in a map). So it will be basically identical. We're just skipping the step of having to having the big byte array in memory / use a pool of buffers

Doesn't the sync.Map have to hash the key again though, even if the key is itself the result of a hash? (Does the gloang map need keys to be well distributed?)

Is the hash used here not going to get occasional collisions? Collisions are a fair trade off in a map, as any collision will be resolved by the map also having the actual key to compare. But wouldn't you want a much bigger hash if it's intended as a UUID and you want to be sure there won't be any collisions?

What is the expected benefit of using a hash?

What are the risks/consequences of something going wrong?

Just kind of worries me, when it is not even clear that there is a significant problem here that needs to be solved in the first place 🤷

@RichieSams
Copy link
Contributor

RichieSams commented Jan 25, 2024

sync.Map() (and golang native map) call Hash() on the keys. So cumulativetodelta uses a string key. The runtime has a Hash() function for native strings, so it uses that. For @sh0rez 's solution, we just directly implement Hash() ourselves for Ident. Meaning we don't need to do the string building.

Both solutions will do a hash. That's just how maps work.

Is the hash used here not going to get occasional collisions?

Yes, which is fine. The std-lib map implementation needs to deal with hash collisions anyway. So this isn't a problem. FNV has "good-enough" hash properties for our use-case.

What is the expected benefit of using a hash?

Just to clarify, we're comparing:

  1. cumulativetodelta approach: Create a big unique string, which is then hashed by the stdlib
  2. @sh0rez approach: Implement Hash() ourselves by using fnv to combine the unique data

@sh0rez 's approach cleaner / clearer what is happening IMO. And we don't need to do large string builder manipulations.

What are the risks/consequences of something going wrong?

The only risk IMO is forgetting to include a "unique" bit of data in the hash combination. But that should be trivial to audit.

@0x006EA1E5
Copy link

I was thinking about this during the implementation of this component and preliminarily decided against it. I don't think it's common to have an exporter receive some metrics as delta and others as cumulative (happy to be proven wrong). Even if that was required, the filterprocessor can be used to build such a pipeline. The filterprocessor is very sophisticated and we would essentially be implementing a "lite-version" here, that will likely always be worse.

@sh0rez I guess it would be nice to know why the cumulativetodeltaprocessor has this include/exclude config in the first place (PR: #8952). Perhaps it predates some of the other feature that can do it better?

@0x006EA1E5
Copy link

@RichieSams Sorry, I'm being slow today (I actually have mild COVID, so my brain is not really cooperating 😅)

Yes, which is fine. The std-lib map implementation needs to deal with hash collisions anyway. So this isn't a problem.

But how can it deal with a collision if the hash is done to create the actual key? 🤔

In a map, hash collisions are just a performance hit, as the hash is just telling the map which bucket to look in, so a collision means that more than one item is in the bucket. Worst case scenario, the map iterates over the bucket items until it finds an exact match for the key.

In the case of using a long string as the key, the map can always compare one long string to another if needed, and get the right item.

In the (admittedly very rare) case of a new metric producing the same hash as another, already tracked metric, you'll get the original metric's tracker returned by the map.

Are you then going to compare the tracked attributes etc against the incoming item, to check there hasn't been a collision? What do you do if there is a collision, track both items under the same key? You're then kind of implementing a map on top of a map.

That's obviously not viable, so you're going to end up just saying that collisions happen so infrequently that you'll pretend they don't happen.

So, for me, the consequences of something going wrong are, (very infrequently) sometimes you'll mangle the users data, and add deltas from one metric to another's cumulative.

You're sacrificing a tiny bit of correctness, so that you don't need to build and store the string.

@0x006EA1E5
Copy link

You're sacrificing a tiny bit of correctness, so that you don't need to build and store the string.

Actually, looking at this, the code here is actually storing all the metric's attributes in the map (am I reading that right?)

Something like this is required, as the emitted cumulative data points need to have all the same attributes as the source datapoint (I guess the alternative would be to copy from each incoming delta datapoint and not store it at all)

So, wouldn't it be possible to have the name, all the attributes etc, as the key, then the map's item could just be the cumulative sum and the timestamps etc? 🤔 This way there isn't even a memory overhead, as you've just moved the data from the map's value to the key.

Maybe that doesn't work for some reason?

@sh0rez
Copy link
Contributor Author

sh0rez commented Jan 29, 2024

@0x006EA1E5 @RichieSams let me try to provide some clarity on metric identity and hashing.

The OTel metrics spec defines the fields required to identify a metric, and also how to identify a single stream within a metric. I have written about that in my design doc: https://docs.google.com/document/d/1Oqwl5rDLqB6-Qgd6Hy1PXYZBAH4pkcdudxNA7bRkrIc/edit#heading=h.a00fffk0v68v. This is also implemented in streams.Identity in my code.
For each distinct stream S there will be one and only one identity I. Each identity points to only one stream. There cannot be collisions by design. The streams are either different, or the identifying attributes in the spec are incomplete, which is a bug.

Go has the concept of comparable data types. Those need to follow some data rules, but then can be directly used with == and as map keys. Go does this by auto-generating hidden hash functions for those.
My Ident data types also have a custom Hash() function which uses the fnv64a algorithm, but this is not used by the current implementation aside for String() printing, so don't confuse these.

Because map (any be extension pcommon.Map) is not comparable, they are the only field that is converted to uint64 by pdatautil.MapHash. The other identifying properties are int or string, which can be copied as-is.

Having those Ident types be structs which include all identifying information (as per spec) as members makes it a little easier to inspect / debug those. It comes at a slight memory cost, as they are bigger than plain hashes. If we ever find that is an issue, we can switch to calling Hash() on these instead, bringing the memory footprint down to uint64. I don't think this will be necessary though.

@0x006EA1E5 The data.Point[T] interface is not directly related to identities, as it's an abstraction over pmetric.NumberDataPoint et al. So they are the "other side" of the map[streams.Ident]data.Point
The implementations of it are also not allocating, as they merely wrap existing pmetric.* pointer types.

The Ident types are very generally useful, the data.Point ones are a bit more delta-to-cumulative focused, e.g. as they have the Add() function.

@sh0rez
Copy link
Contributor Author

sh0rez commented Jan 29, 2024

@0x006EA1E5 I guess it would be nice to know why the cumulativetodeltaprocessor has this include/exclude config in the first place (PR: #8952). Perhaps it predates some of the other feature that can do it better?

That's a good question. Maybe @jpkrohling @codeboten @Aneurysm9 can clarify?

@jpkrohling
Copy link
Member

It looks like @TylerHelmuth might be able to help here, here's the closest I could find about this: #5877 (comment)

Today, I think I'd prefer to see a connector solution to deal with include/exclude though. I would prefer to not have this include/exclude in the new component unless we have concrete use-cases for it that cannot be done otherwise, or can only done in a way that severely impacts performance.

@TylerHelmuth
Copy link
Member

There is #25161 to address filter interface for receivers.

Ultimately the cumulativetodelta allows specifying which metrics to compact because that gives users more control over their data. I tend to favor giving users absolute control, even if that means they can hurt themselves. The cumulativetodelta processor will convert everything if not include/exclude is supplied.

You can also safely choose not to allow include/exclude config at the start; it can always be added later without a breaking change.

@djaglowski
Copy link
Member

It looks like @TylerHelmuth might be able to help here, here's the closest I could find about this: #5877 (comment)

Today, I think I'd prefer to see a connector solution to deal with include/exclude though. I would prefer to not have this include/exclude in the new component unless we have concrete use-cases for it that cannot be done otherwise, or can only done in a way that severely impacts performance.

I agree with this because the alternative IMO leads to a mess of overlapping functionality. Include/exclude is not any more related to this processor than any other type of transformation which a user may need to do. We're better off decomposing the functionality so that users can manage their data however they want.

Ultimately the cumulativetodelta allows specifying which metrics to compact because that gives users more control over their data. I tend to favor giving users absolute control, even if that means they can hurt themselves.

I don't think this actually gives users more control. It just pulls a specific type of control into the component. The problem is that you could make the same case for adding almost any type of processing to almost any component. The reason we have components is so that they can be composed as necessary.

@0x006EA1E5
Copy link

I don't think this actually gives users more control. It just pulls a specific type of control into the component. The problem is that you could make the same case for adding almost any type of processing to almost any component. The reason we have components is so that they can be composed as necessary.

Sorry if this is a silly question, but what would be the best way to address the use case where we want to selectively process some telemetry? 🤔

Would we use the routingconnector and the forwardconnector?

Something like:

receivers:
    otlp/in:

exporters:
    otlp/out:

processors:
    deltatocumulative:

connectors:
  routing:
    default_pipelines: [metrics/out]
    table:
      - statement: route() where <something something something>
        pipelines: [metrics/delta]

  forward:

service:
  pipelines:
    metrics/in:
      receivers: [otlp/in]
      exporters: [routing]
    metrics/delta:
      receivers: [routing]
      processors: [deltatocumulative]
      exporters: [forward]
    metrics/out:
      receivers: [routing, forward]
      exporters: [otlp/out]

?

@djaglowski
Copy link
Member

Would we use the routingconnector and the forwardconnector?

Yes, I believe we should be relying on connectors for routing telemetry to the appropriate processors.

@0x006EA1E5
Copy link

@sh0rez Can I ask what the plan is for the common functionality that we can expect to be shared with the implementation of #29461 (metric identity etc)?

It looks like you are doing everything within the deltatocumulative processor in the initial PR, and expecting that any subsequent processors would factor out needed common functionality, right?

Do we intend to also refactor the cumulativetodelta processor to also share any common functionality we have here here? Obviously, if so, we would need to be careful about any subtile changes in behaviour...

I'm wondering, have you considered simply refactoring cumulativetodelta to pull out anything shared with this processor first, as part of the initial implementation of the deltatocumulative processor?

This could mean either simply adopting the cumulativetodelta tracking implementation as part if the PR (and then re-implementing later as another issue, with the improvements you have shown), or going ahead and changing the to-be-shared common tracker now for both processors, with your improvements.

The value of this would be we don't diverge between cumulativetodelta and deltatocumulative, and it would also make it a bit easier to make a start on #29461 now, as @RichieSams could branch from your branch (Otherwise as noted elsewhere it becomes a bit difficult to review #30827).

It seems to me that we will surely have to factor out common code at some point soon in any case (as #29461 is required to make the deltatocumulative useful the original use-case of the count connector -> prometheus remote write exporter). And it is possible that making the common code work for all three processors (cumulativetodelta, deltatocumulative, #29461) would inform the initial implementation here.

Is this a useful suggestion, or does it make things unnecessarily more difficult for you?

@sh0rez
Copy link
Contributor Author

sh0rez commented Feb 13, 2024

@0x006EA1E5

I'm wondering, have you considered simply refactoring cumulativetodelta to pull out anything shared with this processor first, as part of the initial implementation of the deltatocumulative processor?

yes I considered that, however I chose against doing so for two major reasons:

  • it's internal to that processor and written in a rather specific, non-composable way, so some changes would be necessary. Adding that to the process would mean additional slow-down I was trying to avoid for this initial phase.
  • the implementation is string-based, as in all identifying data is written to a string, which is briefly hashed and immediately garbage-collected. I think we can do better from a memory perspective and I think we succeeded here. I can run benchmarks to back this up if desired.

The value of this would be we don't diverge between cumulativetodelta and deltatocumulative, and it would also make it a bit easier to make a start on #29461 now, as @RichieSams could branch from your branch (Otherwise #30827 (review) it becomes a bit difficult to review #30827).

I factored out generic tracking in #31017 (comment) recently, which was then picked up by @RichieSams in #31089 (comment). Looking at that PR and his other work on the interval processor, the proposed api seems to be as versatile and composable as I had hoped for.

Converting the cumulativetodelta at a later point should be rather straightforward as well, bringing the performance benefits there as well.

Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

@github-actions github-actions bot added the Stale label Apr 15, 2024
@jpkrohling jpkrohling removed the Stale label Apr 30, 2024
jpkrohling added a commit that referenced this issue May 21, 2024
…code owners (#33019)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
@jpkrohling and @djaglowski volunteered to be sponsors of the delta to
cumulative processor, and @djaglowski also volunteered to be sponsor of
the interval processor in relation to this. They should also be code
owners.

From
[CONTRIBUTING.md](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#adding-new-components):
```
A sponsor is an approver who will be in charge of being the official reviewer of the code and become a code owner for the component.
```

**Link to tracking Issue:** <Issue number if applicable>

#30479
- Delta to cumulative processor

#29461
- Interval processor

---------

Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
Copy link
Contributor

github-actions bot commented Jul 1, 2024

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

@github-actions github-actions bot added the Stale label Jul 1, 2024
@jpkrohling
Copy link
Member

I'm closing this, as the first versions of this component are done (even though it's still under development).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Accepted Component New component has been sponsored Stale
Projects
None yet
Development

No branches or pull requests

8 participants