-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add rudimentary downsampling for m3coordinator #744
Conversation
@xichen2020 @nikunjgit any feedback before I start productionizing this? |
return false | ||
} | ||
|
||
type rollupIDProvider struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some comments explaining what this type is responsible for? It's hard to tell by just reading the code so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, I'll add a comment above the type. (It just constructs a rollup ID when necessary, and can be pooled)
} | ||
|
||
func (p *rollupIDProvider) Next() bool { | ||
p.index++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing to me. Can you instead having length
simply return len(p.tagPairs)
and here do a length check first before you increment the index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the code after this and coming back, it seems you are pretending that there is an extra rollup tag pair here which is to presumably satisfy the encoder interface(?), is that what's happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what's happening. I'll leave a comment to this effect. I wanted to avoid another alloc of TagPairs.
p.tagPairs = tagPairs | ||
p.rollupTagIndex = -1 | ||
for idx, pair := range tagPairs { | ||
if bytes.Compare(rollupTagName, pair.Name) < 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there is no rollup tag in the list of tag pairs provided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(As per other comment, there is meant to be always the rollup tag injected so there will never be one in the tag pairs provided as we inject it ourselves)
return p.length() - p.index - 1 | ||
} | ||
|
||
func (p *rollupIDProvider) Duplicate() ident.TagIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only does a shallow copy (e.g., the tag encoder is shared between itself and the clone). Is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only made this to fulfill the interface, in practice it won't be duplicated. I can make a deep copy though just to be careful.
iterPool *encodedTagsIteratorPool, | ||
) ([]byte, []byte, error) { | ||
// ID is always the encoded tags for downsampling IDs | ||
metricTags := id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The id is passed in and returned without getting used in this method, can we just remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's part of the method required, but I can move that to the lambda calling it perhaps.
|
||
tagsFilterOptions := filters.TagsFilterOptions{ | ||
NameTagKey: metricNameTagName, | ||
NameAndTagsFn: func(id []byte) ([]byte, []byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean for prom metrics name is a separate tag just like any other normal tags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed.
SetGaugePrefix(nil). | ||
SetTimerPrefix(nil) | ||
|
||
shardSet := make([]shard.Shard, numShards) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the placement always statically configured?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this downsampler always just runs local to the coordinator.
} | ||
|
||
campaignOpts = campaignOpts.SetLeaderValue(leaderValue) | ||
electionCluster := integration.NewClusterV3(nil, &integration.ClusterConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:\ so the coordinator also starts an embedded single-node etcd cluster? Why not pass in the etcd cluster externally so we can use the embedded KV cluster in M3DB nodes if we choose to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don't have an external etcd cluster. What actually needs to happen here is we need to remove the dependency of a real etcd cluster from the leader service, it should just take a struct that can fulfill some things and abstract the underlying implementation (which happens to be etcd).
That way we don't need etcd at all for just a local single node in-memory aggregator.
// NonePolicy is the none downsampling policy. | ||
NonePolicy NonePolicy `yaml:"nonePolicy"` | ||
// AggregationPolicy is the aggregation downsampling policy. | ||
AggregationPolicy AggregationPolicy `yaml:"aggregationPolicy"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So does this currently mean that if this is provided the downsampling is effectively achieved by aggregating datapoints within a resolution window?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, although you can have multiple policies enabled.
for _, s := range result.SeriesList { | ||
id := s.Name() | ||
existing, exists := r.dedupeMap[id] | ||
if exists && existing.attrs.Resolution < attrs.Resolution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to make this configurable as opposed to always picking the finest resolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should yes, perhaps I'll leave a followup though as we probably don't need to provide that for v1.
Did a pass with a focus on the aggregator related logic and some of the query logic, @nikunjgit can probably do a more thorough review on the query part. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I like the approach of making the handler figure out the downsampling and keeping that out of storage. At a high level, I have a few suggestions:
-
Consider moving downsample out of coordinator into m3aggregator. Downsampler is pretty generic already.
-
Consider separating out diffs for downsampling and clustering. Both are pretty big features and make this diff a little confusing.
-
For clustering, we should probably discuss how to avoid fanout to every cluster on every request. Doesn't look like we would ever want to do that for our internal use cases and it might be too soon to assume that external users might need it.
-
For downsampling, provide a way for it to work while the storage being a prometheus remote. It looks possible with multi-clusters but maybe we shouldn't tie them together ? People might want to downsample but may not be comfortable with managing many clusters.
@@ -0,0 +1,330 @@ | |||
// Copyright (c) 2018 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downsample_id is a little verbose since it's already in downsample ?
// Ensure encodedTagsIterator implements id.ID | ||
var _ id.ID = &encodedTagsIterator{} | ||
|
||
type encodedTagsIterator struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feel like the tag iterator stuff should belong somewhere else rather than with the downsampler. It is not already in m3db ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to live somewhere that cares about metric ID format. Since m3aggregator and m3db are both agnostic to the metric IDs and how tags are encoded they aren't the best location for this code to leave. m3coordinator however knows about the concrete formats, i.e. prometheus metric ID format. This package is the only one that needs to know about how tags are encoded for internal use in the downsampler, it would be leaky to make it need to live elsewhere and be imported into the downsample package (especially since the types are pretty nasty and concern optimizations for their use locally in this package).
instrumentOpts = o.InstrumentOptions | ||
) | ||
if o.StorageFlushConcurrency > 0 { | ||
storageFlushConcurrency = storageFlushConcurrency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cheers, thanks for the catch.
request := newLocalWriteRequest(write, h.store) | ||
requests = append(requests, request) | ||
} | ||
writeRawErr = execution.ExecuteParallel(ctx, requests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will block. So essentially we'd be blocking for writeRaw but not for WriteAgg ? Maybe just make everything a execution.Request and then ExecuteParallel.
@@ -0,0 +1,756 @@ | |||
// Copyright (c) 2018 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this even belong within coordinator ? Consider keeping this within m3aggregator ? Seems like it's pretty generic already. You're just using a few coordinator specific things during flush, which can easily be abstracted out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this belongs in m3aggregator. This is essentially using m3aggregator as a library and this is the code that configures the various config options and creates the aggregator, which IMO should live outside the aggregator library. Additionally, there's a lot of logic here that parses tags etc, which should live outside m3aggregator since the aggregator has no notion of tags and it would be strange to introduce it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This downsampler is essentially a single node aggregation package, currently it's not used outside of the coordinator and I doubt would really have a home anywhere else (also the only two options we should ever really offer is simple, non-HA downsampling in the m3coordinator, or proper HA downsampling with the m3aggregator in a clustered setup).
I'm inclined to leave it here until it's used anywhere else (and hopefully never used anywhere else, since you should be using the m3aggregator otherwise). Also it helps avoid needing to make it super generic and optimize it for it's use in the m3coordinator.
Codecov Report
@@ Coverage Diff @@
## master #744 +/- ##
==========================================
- Coverage 78.33% 56.7% -21.64%
==========================================
Files 355 355
Lines 30188 30730 +542
==========================================
- Hits 23649 17425 -6224
- Misses 4995 11745 +6750
- Partials 1544 1560 +16
Continue to review full report at Codecov.
|
) *encodedTagsIterator { | ||
return &encodedTagsIterator{ | ||
tagDecoder: tagDecoder, | ||
bytes: checked.NewBytes(nil, nil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth getting this from a pool?
|
||
// TagValue returns the value for a tag value. | ||
func (it *encodedTagsIterator) TagValue(tagName []byte) ([]byte, bool) { | ||
it.tagDecoder.Reset(it.bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this clone the tagIterator first, then reset that? If I'm reading this right if you ever call TagValue, it'll mess with the current iterator position, and if you were trying to get value for a missing name, you'd use up the iterator?
Would it be worth it to decode the tagDecoder
to a list of tags
on Reset
for this?
|
||
type encodedTagsIteratorPool struct { | ||
tagDecoderPool serialize.TagDecoderPool | ||
pool pool.ObjectPool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth giving this guy a xpool.CheckedBytesWrapperPool
too?
} | ||
|
||
func (p *encodedTagsIteratorPool) Init() { | ||
p.tagDecoderPool.Init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a chance this is already initialized, eg if this will try to re-use an existing decoder pool?
) *encodedTagsIterator { | ||
return &encodedTagsIterator{ | ||
tagDecoder: tagDecoder, | ||
bytes: checked.NewBytes(nil, nil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth getting this from a pool?
} | ||
|
||
func (h *downsamplerFlushHandler) Close() { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we get a comment here? Either // noop
or // TODO
depending on what's required
func (w *downsamplerFlushHandlerWriter) Write( | ||
mp aggregated.ChunkedMetricWithStoragePolicy, | ||
) error { | ||
w.wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little odd to me to have this function touching the internal wait group; what's the difference between doing it this way and calling Flush at the end v.s. the calling function handling the parallelism?
|
||
func (a *metricsAppender) SamplesAppender() (SamplesAppender, error) { | ||
// Sort tags | ||
sort.Sort(a.tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to insert the tags in the sorted location?
id := a.encodedTagsIteratorPool.Get() | ||
id.Reset(unownedID) | ||
now := time.Now() | ||
fromNanos, toNanos := now.Add(-1*a.clockOpts.MaxNegativeSkew()).UnixNano(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: now.Sub
, might be easier to read across two lines?
numRollups := matchResult.NumRollups() | ||
for i := 0; i < numRollups; i++ { | ||
rollup, ok := matchResult.RollupsAt(i, now.UnixNano()) | ||
if !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Better to flip this and add samplesAppender if ok
Deprecating in favor of new downsampling PR #796, now that the multi-cluster support has landed. |
No description provided.