Skip to content

Commit

Permalink
Support histograms in pkg/storage and update other breakages (#4354)
Browse files Browse the repository at this point in the history
* Support histograms in pkg/storage and update other breakages

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome authored Mar 6, 2023
1 parent 35b6661 commit a033254
Show file tree
Hide file tree
Showing 9 changed files with 543 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newMockShardedQueryable(
sets := genLabels(labelSet, labelBuckets)
xs := make([]storage.Series, 0, len(sets))
for _, ls := range sets {
xs = append(xs, series.NewConcreteSeries(ls, samples))
xs = append(xs, series.NewConcreteSeries(ls, samples, nil))
}

return &mockShardedQueryable{
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *sto
})
}

set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples))
set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples, nil))
}
}
return series.NewConcreteSeriesSet(set)
Expand Down
29 changes: 22 additions & 7 deletions pkg/querier/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,37 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/modelutil"
)

func mergeChunks(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator {
samples := make([][]model.SamplePair, 0, len(chunks))
var (
samples = make([][]model.SamplePair, 0, len(chunks))
histograms [][]mimirpb.Histogram
mergedSamples []model.SamplePair
mergedHistograms []mimirpb.Histogram
)
for _, c := range chunks {
ss, err := c.Samples(from, through)
sf, sh, err := c.Samples(from, through)
if err != nil {
return series.NewErrIterator(err)
}

samples = append(samples, ss)
if len(sf) > 0 {
samples = append(samples, sf)
}
if len(sh) > 0 {
histograms = append(histograms, sh)
}
}
if len(histograms) > 0 {
mergedHistograms = modelutil.MergeNHistogramSets(histograms...)
}
if len(samples) > 0 {
mergedSamples = modelutil.MergeNSampleSets(samples...)
}

merged := util.MergeNSampleSets(samples...)
return series.NewConcreteSeriesIterator(series.NewConcreteSeries(nil, merged))
return series.NewConcreteSeriesIterator(series.NewConcreteSeries(nil, mergedSamples, mergedHistograms))
}
37 changes: 27 additions & 10 deletions pkg/storage/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
package chunk

import (
"fmt"
"io"
"unsafe"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/grafana/mimir/pkg/mimirpb"
)

const (
Expand Down Expand Up @@ -120,24 +123,38 @@ func NewChunk(metric labels.Labels, c EncodedChunk, from, through model.Time) Ch
}
}

// Samples returns all SamplePairs for the chunk.
func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, error) {
// Samples returns all SamplePairs and Histograms for the chunk.
func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, []mimirpb.Histogram, error) {
it := c.Data.NewIterator(nil)
return rangeValues(it, from, through)
}

// rangeValues is a utility function that retrieves all values within the given
// range from an Iterator.
func rangeValues(it Iterator, oldestInclusive, newestInclusive model.Time) ([]model.SamplePair, error) {
result := []model.SamplePair{}
if it.FindAtOrAfter(oldestInclusive) == chunkenc.ValNone {
return result, it.Err()
func rangeValues(it Iterator, oldestInclusive, newestInclusive model.Time) ([]model.SamplePair, []mimirpb.Histogram, error) {
resultFloat := []model.SamplePair{}
resultHist := []mimirpb.Histogram{}
currValType := it.FindAtOrAfter(oldestInclusive)
if currValType == chunkenc.ValNone {
return resultFloat, resultHist, it.Err()
}
for !it.Value().Timestamp.After(newestInclusive) {
result = append(result, it.Value())
if it.Scan() == chunkenc.ValNone {
for !model.Time(it.Timestamp()).After(newestInclusive) {
switch currValType {
case chunkenc.ValFloat:
resultFloat = append(resultFloat, it.Value())
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
resultHist = append(resultHist, mimirpb.FromHistogramToHistogramProto(t, h))
case chunkenc.ValFloatHistogram:
t, h := it.AtFloatHistogram()
resultHist = append(resultHist, mimirpb.FromFloatHistogramToHistogramProto(t, h))
default:
return nil, nil, fmt.Errorf("unknown value type %v in iterator", currValType)
}
currValType = it.Scan()
if currValType == chunkenc.ValNone {
break
}
}
return result, it.Err()
return resultFloat, resultHist, it.Err()
}
136 changes: 113 additions & 23 deletions pkg/storage/series/series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/grafana/mimir/pkg/mimirpb"
)

// ConcreteSeriesSet implements storage.SeriesSet.
Expand Down Expand Up @@ -55,15 +57,17 @@ func (c *ConcreteSeriesSet) Warnings() storage.Warnings {

// ConcreteSeries implements storage.Series.
type ConcreteSeries struct {
labels labels.Labels
samples []model.SamplePair
labels labels.Labels
samples []model.SamplePair
histograms []mimirpb.Histogram
}

// NewConcreteSeries instantiates an in memory series from a list of samples & labels
func NewConcreteSeries(ls labels.Labels, samples []model.SamplePair) *ConcreteSeries {
// NewConcreteSeries instantiates an in memory series from a list of samples & histograms & labels
func NewConcreteSeries(ls labels.Labels, samples []model.SamplePair, histograms []mimirpb.Histogram) *ConcreteSeries {
return &ConcreteSeries{
labels: ls,
samples: samples,
labels: ls,
samples: samples,
histograms: histograms,
}
}

Expand All @@ -79,52 +83,138 @@ func (c *ConcreteSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {

// concreteSeriesIterator implements chunkenc.Iterator.
type concreteSeriesIterator struct {
cur int
series *ConcreteSeries
curFloat int
curHisto int
atHisto bool
series *ConcreteSeries
}

// NewConcreteSeriesIterator instantiates an in memory chunkenc.Iterator
func NewConcreteSeriesIterator(series *ConcreteSeries) chunkenc.Iterator {
return &concreteSeriesIterator{
cur: -1,
series: series,
curFloat: -1,
curHisto: -1,
atHisto: false,
series: series,
}
}

// atTypeHisto is an internal method to differentiate between histogram and float histogram value types
// Checking that c.curHisto is a valid index in the c.series.histograms array and that
// c.atHisto is true must be done outside of this
func (c *concreteSeriesIterator) atTypeHisto() chunkenc.ValueType {
if c.series.histograms[c.curHisto].IsFloatHistogram() {
return chunkenc.ValFloatHistogram
}
return chunkenc.ValHistogram
}

// atType returns current timestamp and value type
func (c *concreteSeriesIterator) atType() (int64, chunkenc.ValueType) {
if c.atHisto {
if c.curHisto < 0 || c.curHisto >= len(c.series.histograms) {
return 0, chunkenc.ValNone
}
return c.series.histograms[c.curHisto].Timestamp, c.atTypeHisto()
}
if c.curFloat < 0 || c.curFloat >= len(c.series.samples) {
return 0, chunkenc.ValNone
}
return int64(c.series.samples[c.curFloat].Timestamp), chunkenc.ValFloat
}

func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
oldTime, oldType := c.atType()
if oldTime >= t { // only advance via Seek
return oldType
}

c.curFloat = sort.Search(len(c.series.samples), func(n int) bool {
return c.series.samples[n].Timestamp >= model.Time(t)
})
if c.cur < len(c.series.samples) {
c.curHisto = sort.Search(len(c.series.histograms), func(n int) bool {
return c.series.histograms[n].Timestamp >= t
})

if c.curFloat >= len(c.series.samples) && c.curHisto >= len(c.series.histograms) {
return chunkenc.ValNone
}
if c.curFloat >= len(c.series.samples) {
c.atHisto = true
return c.atTypeHisto()
}
if c.curHisto >= len(c.series.histograms) {
c.atHisto = false
return chunkenc.ValFloat
}
return chunkenc.ValNone
if int64(c.series.samples[c.curFloat].Timestamp) < c.series.histograms[c.curHisto].Timestamp {
c.curHisto--
c.atHisto = false
return chunkenc.ValFloat
}
c.curFloat--
c.atHisto = true
return c.atTypeHisto()
}

func (c *concreteSeriesIterator) At() (t int64, v float64) {
s := c.series.samples[c.cur]
if c.atHisto {
panic(errors.New("concreteSeriesIterator: Calling At() when cursor is at histogram"))
}
s := c.series.samples[c.curFloat]
return int64(s.Timestamp), float64(s.Value)
}

func (c *concreteSeriesIterator) Next() chunkenc.ValueType {
c.cur++
if c.cur < len(c.series.samples) {
if c.curFloat+1 >= len(c.series.samples) && c.curHisto+1 >= len(c.series.histograms) {
c.curFloat = len(c.series.samples)
c.curHisto = len(c.series.histograms)
return chunkenc.ValNone
}
if c.curFloat+1 >= len(c.series.samples) {
c.curHisto++
c.atHisto = true
return c.atTypeHisto()
}
if c.curHisto+1 >= len(c.series.histograms) {
c.curFloat++
c.atHisto = false
return chunkenc.ValFloat
}
return chunkenc.ValNone
if int64(c.series.samples[c.curFloat+1].Timestamp) < c.series.histograms[c.curHisto+1].Timestamp {
c.curFloat++
c.atHisto = false
return chunkenc.ValFloat
}
c.curHisto++
c.atHisto = true
return c.atTypeHisto()
}

func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
panic(errors.New("concreteSeriesIterator: AtHistogram not implemented"))
if !c.atHisto {
panic(errors.New("concreteSeriesIterator: Calling AtHistogram() when cursor is not at histogram"))
}
h := c.series.histograms[c.curHisto]
return h.Timestamp, mimirpb.FromHistogramProtoToHistogram(&h)
}

func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
panic(errors.New("concreteSeriesIterator: AtHistogram not implemented"))
if !c.atHisto {
panic(errors.New("concreteSeriesIterator: Calling AtFloatHistogram() when cursor is not at histogram"))
}
h := c.series.histograms[c.curHisto]
if h.IsFloatHistogram() {
return h.Timestamp, mimirpb.FromHistogramProtoToFloatHistogram(&h)
}
return h.Timestamp, mimirpb.FromHistogramProtoToHistogram(&h).ToFloat()
}

func (c *concreteSeriesIterator) AtT() int64 {
s := c.series.samples[c.cur]
return int64(s.Timestamp)
if c.atHisto {
return c.series.histograms[c.curHisto].Timestamp
}
return int64(c.series.samples[c.curFloat].Timestamp)
}

func (c *concreteSeriesIterator) Err() error {
Expand Down Expand Up @@ -177,6 +267,7 @@ func MatrixToSeriesSet(m model.Matrix) storage.SeriesSet {
series = append(series, &ConcreteSeries{
labels: metricToLabels(ss.Metric),
samples: ss.Values,
// histograms: ss.Histograms, // cannot convert the decoded matrix form to the expected encoded format. this method is only used in tests so ignoring histogram support for now
})
}
return NewConcreteSeriesSet(series)
Expand All @@ -187,8 +278,7 @@ func LabelsToSeriesSet(ls []labels.Labels) storage.SeriesSet {
series := make([]storage.Series, 0, len(ls))
for _, l := range ls {
series = append(series, &ConcreteSeries{
labels: l,
samples: nil,
labels: l,
})
}
return NewConcreteSeriesSet(series)
Expand Down
Loading

0 comments on commit a033254

Please sign in to comment.