diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types.go b/src/cmd/services/m3coordinator/downsample/id_pool_types.go index b5dfb26802..85597abe35 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types.go @@ -65,25 +65,21 @@ func isRollupID( // method, it will return the rollup tag in the correct alphabetical order // when progressing through the existing tags. type rollupIDProvider struct { - index int - newName []byte - tagPairs []id.TagPair - nameTagIndex int - rollupTagIndex int - - tagEncoder serialize.TagEncoder - pool *rollupIDProviderPool - nameTag ident.ID - nameTagBytes []byte - nameTagBeforeRollupTag bool - tagNameID *ident.ReusableBytesID - tagValueID *ident.ReusableBytesID - mergeTags []mergeTag -} - -type mergeTag struct { - name []byte - index int + index int + len int + mergeTagsIdx int + newName []byte + tagPairs []id.TagPair + curr id.TagPair + currIdx int + + tagEncoder serialize.TagEncoder + pool *rollupIDProviderPool + nameTag ident.ID + nameTagBytes []byte + tagNameID *ident.ReusableBytesID + tagValueID *ident.ReusableBytesID + mergeTags []id.TagPair } func newRollupIDProvider( @@ -93,29 +89,27 @@ func newRollupIDProvider( ) *rollupIDProvider { nameTagBytes := nameTag.Bytes() nameTagBeforeRollupTag := bytes.Compare(nameTagBytes, rollupTagName) < 0 - mergeTags := []mergeTag{ + mergeTags := []id.TagPair{ { - name: rollupTagName, - index: -1, + Name: rollupTagName, + Value: rollupTagValue, }, { - name: nameTagBytes, - index: -1, + Name: nameTagBytes, + // Value is set in reset }, } if nameTagBeforeRollupTag { - mergeTags[0].name = nameTagBytes - mergeTags[1].name = rollupTagName + mergeTags[0], mergeTags[1] = mergeTags[1], mergeTags[0] } return &rollupIDProvider{ - tagEncoder: tagEncoder, - pool: pool, - nameTag: nameTag, - nameTagBytes: nameTagBytes, - nameTagBeforeRollupTag: nameTagBeforeRollupTag, - tagNameID: ident.NewReusableBytesID(), - tagValueID: ident.NewReusableBytesID(), - mergeTags: mergeTags, + tagEncoder: tagEncoder, + pool: pool, + nameTag: nameTag, + nameTagBytes: nameTagBytes, + tagNameID: ident.NewReusableBytesID(), + tagValueID: ident.NewReusableBytesID(), + mergeTags: mergeTags, } } @@ -143,41 +137,25 @@ func (p *rollupIDProvider) reset( newName []byte, tagPairs []id.TagPair, ) { - p.index = -1 p.newName = newName p.tagPairs = tagPairs - p.mergeTags[0].index = -1 - p.mergeTags[1].index = -1 - - // merge the special tags into the set of tag pairs without allocating extra space. - idx := 0 - for _, pair := range tagPairs { - if p.mergeTags[0].index == -1 && bytes.Compare(p.mergeTags[0].name, pair.Name) < 0 { - p.mergeTags[0].index = idx - idx++ + p.Rewind() + + var dups int + // precompute the length of the "combined" slice for the Len() method. + // mergeTags is small so it's fine to do n^2 instead of a more complicated O(n) merge scan. + for j := range p.mergeTags { + // update the name tag as well. + if bytes.Equal(p.mergeTags[j].Name, p.nameTagBytes) { + p.mergeTags[j].Value = newName } - if p.mergeTags[1].index == -1 && bytes.Compare(p.mergeTags[1].name, pair.Name) < 0 { - p.mergeTags[1].index = idx - // all tags merged, safe to break. - break + for i := range p.tagPairs { + if bytes.Equal(p.tagPairs[i].Name, p.mergeTags[j].Name) { + dups++ + } } - idx++ - } - - if p.mergeTags[0].index == -1 { - p.mergeTags[0].index = idx - idx++ - } - if p.mergeTags[1].index == -1 { - p.mergeTags[1].index = idx - } - - p.rollupTagIndex = p.mergeTags[0].index - p.nameTagIndex = p.mergeTags[1].index - if p.nameTagBeforeRollupTag { - p.nameTagIndex = p.mergeTags[0].index - p.rollupTagIndex = p.mergeTags[1].index } + p.len = len(p.tagPairs) + len(p.mergeTags) - dups } func (p *rollupIDProvider) finalize() { @@ -186,42 +164,46 @@ func (p *rollupIDProvider) finalize() { } } +// Next takes the smallest element across both sets of tags, removing any duplicates between the lists. func (p *rollupIDProvider) Next() bool { - p.index++ - return p.index < p.Len() + if p.index == len(p.tagPairs) && p.mergeTagsIdx == len(p.mergeTags) { + // at the end of both sets + return false + } + switch { + case p.index == len(p.tagPairs): + // only merged tags left + p.curr = p.mergeTags[p.mergeTagsIdx] + p.mergeTagsIdx++ + case p.mergeTagsIdx == len(p.mergeTags): + // only provided tags left + p.curr = p.tagPairs[p.index] + p.index++ + case bytes.Equal(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name): + // a merge tag exists in the provided tag, advance both to prevent duplicates. + p.curr = p.tagPairs[p.index] + p.index++ + p.mergeTagsIdx++ + case bytes.Compare(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name) < 0: + // the next provided tag is less + p.curr = p.tagPairs[p.index] + p.index++ + default: + // the next merge tag is less + p.curr = p.mergeTags[p.mergeTagsIdx] + p.mergeTagsIdx++ + } + p.currIdx++ + return true } func (p *rollupIDProvider) CurrentIndex() int { - if p.index >= 0 { - return p.index - } - return 0 + return p.currIdx } func (p *rollupIDProvider) Current() ident.Tag { - idx := p.index - if idx == p.nameTagIndex { - p.tagValueID.Reset(p.newName) - return ident.Tag{ - Name: p.nameTag, - Value: p.tagValueID, - } - } - if idx == p.rollupTagIndex { - return rollupTag - } - - if p.index > p.nameTagIndex { - // Effective index is subtracted by 1 - idx-- - } - if p.index > p.rollupTagIndex { - // Effective index is subtracted by 1 - idx-- - } - - p.tagNameID.Reset(p.tagPairs[idx].Name) - p.tagValueID.Reset(p.tagPairs[idx].Value) + p.tagNameID.Reset(p.curr.Name) + p.tagValueID.Reset(p.curr.Value) return ident.Tag{ Name: p.tagNameID, Value: p.tagValueID, @@ -235,11 +217,11 @@ func (p *rollupIDProvider) Err() error { func (p *rollupIDProvider) Close() {} func (p *rollupIDProvider) Len() int { - return len(p.tagPairs) + 2 + return p.len } func (p *rollupIDProvider) Remaining() int { - return p.Len() - p.index - 1 + return p.Len() - p.CurrentIndex() - 1 } func (p *rollupIDProvider) Duplicate() ident.TagIterator { @@ -249,7 +231,9 @@ func (p *rollupIDProvider) Duplicate() ident.TagIterator { } func (p *rollupIDProvider) Rewind() { - p.index = -1 + p.index = 0 + p.mergeTagsIdx = 0 + p.currIdx = -1 } type rollupIDProviderPool struct { diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go b/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go index 8170a7db36..384f467e85 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go @@ -267,6 +267,39 @@ func TestRollupIdProvider(t *testing.T) { }, }, }, + { + name: "rollup and name already exists", + nameTag: "__name__", + metricName: "http_requests", + tags: []id.TagPair{ + { + Name: []byte("__name__"), + Value: []byte("http_requests"), + }, + { + Name: []byte("__rollup__"), + Value: []byte("true"), + }, + { + Name: []byte("foo"), + Value: []byte("fooValue"), + }, + }, + expectedTags: []id.TagPair{ + { + Name: []byte("__name__"), + Value: []byte("http_requests"), + }, + { + Name: []byte("__rollup__"), + Value: []byte("true"), + }, + { + Name: []byte("foo"), + Value: []byte("fooValue"), + }, + }, + }, } for _, tc := range cases { @@ -277,6 +310,20 @@ func TestRollupIdProvider(t *testing.T) { } encoder := &serialize.FakeTagEncoder{} p := newRollupIDProvider(encoder, nil, ident.BytesID(tc.nameTag)) + p.reset([]byte(tc.metricName), tc.tags) + require.Equal(t, len(tc.expectedTags), p.Len()) + curIdx := 0 + for p.Next() { + require.Equal(t, curIdx, p.CurrentIndex()) + curIdx++ + require.Equal(t, p.Len()-curIdx, p.Remaining()) + } + p.Rewind() + curIdx = 0 + for p.Next() { + require.Equal(t, curIdx, p.CurrentIndex()) + curIdx++ + } rollupID, err := p.provide([]byte(tc.metricName), tc.tags) require.NoError(t, err) encoded, _ := encoder.Data()