Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mimirtool block-subset command to check if block1 is subset of block2 #10133

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/mimirtool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
ruleCommand commands.RuleCommand
backfillCommand commands.BackfillCommand
runtimeConfigCommand commands.RuntimeConfigCommand
blockSubsetCommand commands.BlockSubsetCommand
)

func main() {
Expand All @@ -52,6 +53,7 @@ func main() {
remoteReadCommand.Register(app, envVars)
ruleCommand.Register(app, envVars, prometheus.DefaultRegisterer)
runtimeConfigCommand.Register(app)
blockSubsetCommand.Register(app)

app.Command("version", "Get the version of the mimirtool CLI").Action(func(*kingpin.ParseContext) error {
fmt.Fprintln(os.Stdout, mimirversion.Print("Mimirtool"))
Expand Down
168 changes: 168 additions & 0 deletions pkg/mimirtool/commands/block_subset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// SPDX-License-Identifier: AGPL-3.0-only

package commands

import (
"context"
"errors"
"fmt"
"math"

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

type BlockSubsetCommand struct {
subsetBlock string
supersetBlock string
}

func (c *BlockSubsetCommand) Register(app *kingpin.Application) {
cmd := app.Command("block-subset", "TODO")
cmd.Action(c.checkSubset)

cmd.Flag("subset-block-path", "TODO").
Default("TODO").
StringVar(&c.subsetBlock)

cmd.Flag("superset-block-path", "TODO").
Default("TODO").
StringVar(&c.supersetBlock)
}

func (c *BlockSubsetCommand) checkSubset(_ *kingpin.ParseContext) error {
sub, err := tsdb.OpenBlock(promslog.NewNopLogger(), c.subsetBlock, nil, nil)
if err != nil {
return err
}
defer func() {
_ = sub.Close()
}()

sup, err := tsdb.OpenBlock(promslog.NewNopLogger(), c.supersetBlock, nil, nil)
if err != nil {
return err
}
defer func() {
_ = sup.Close()
}()

subMeta, supMeta := sub.Meta(), sup.Meta()
if subMeta.MinTime < supMeta.MinTime || subMeta.MaxTime > supMeta.MaxTime {
return errors.New("subset block time range is not within the superset block time range")
}

// Check number of series
if subMeta.Stats.NumSeries > supMeta.Stats.NumSeries {
return errors.New("subset block has more series than the superset block")
}

// Check number of samples
if subMeta.Stats.NumSamples > supMeta.Stats.NumSamples {
return errors.New("subset block has more samples than the superset block")
}

subQ, err := tsdb.NewBlockQuerier(sub, subMeta.MinTime, subMeta.MaxTime)
if err != nil {
return err
}
defer func() {
_ = subQ.Close()
}()
supQ, err := tsdb.NewBlockQuerier(sup, supMeta.MinTime, supMeta.MaxTime)
if err != nil {
return err
}
defer func() {
_ = supQ.Close()
}()

// Check if all series in the subset block are in the superset block
subSeries := subQ.Select(context.Background(), true, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
supSeries := supQ.Select(context.Background(), true, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))

s1Exists := subSeries.Next()
s2Exists := supSeries.Next()
svMismatch := 0
seriesMis := 0
var s1It, s2It chunkenc.Iterator
for {

if !s1Exists {
// We are done.
break
}

if !s2Exists {
// TODO: mention which series
return errors.New("subset block has series that are not in the superset block")
}

s1 := subSeries.At()
s2 := supSeries.At()

cmp := labels.Compare(s1.Labels(), s2.Labels())

if cmp == 0 {
// Check for samples to be subset
mismatch := false

s1It = s1.Iterator(s1It)
s2It = s2.Iterator(s2It)
s1Type := s1It.Next()
s2Type := s2It.Next()
for {
if s1Type == chunkenc.ValNone {
break
}

if s2Type == chunkenc.ValNone {
// TODO: mention which sample for what series
return errors.New("subset block has a sample that is not in the superset block")
}

s1T, s1V := s1It.At()
s2T, s2V := s2It.At()

if s1T == s2T {
if s1V != s2V && (!math.IsNaN(s1V) || !math.IsNaN(s2V)) {
svMismatch++
mismatch = true
}

s1Type = s1It.Next()
s2Type = s2It.Next()
} else if s1T < s2T {
// subSeries has a sample that is not in supSeries.
// TODO: mention which sample for what series
return errors.New("subset block has a sample that is not in the superset block")
} else {
// supSeries has a sample that is not in subSeries. Advance supSeries.
s2Type = s2It.Next()
}
}

if mismatch {
seriesMis++
}

s1Exists = subSeries.Next()
s2Exists = supSeries.Next()
} else if cmp < 0 {
// TODO: mention which series
return errors.New("subset block has series that are not in the superset block")
} else {
// supSeries has a series that is not in subSeries. Advance supSeries.
s2Exists = supSeries.Next()
}
}

if svMismatch > 0 {
fmt.Printf("%d samples mismatch in value for the same timestamp in %d series\n", svMismatch, seriesMis)
}

return nil
}
147 changes: 147 additions & 0 deletions pkg/mimirtool/commands/block_subset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package commands

import (
"context"
"errors"
"path"
"strconv"
"testing"
"time"

"github.com/prometheus/common/promslog"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"
)

func TestBlockSubsetCommand(t *testing.T) {
cases := []struct {
name string
err error
sampleAppender func(a1, a2 storage.Appender)
}{
{
name: "same block",
sampleAppender: func(a1, a2 storage.Appender) {
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
_, err := a2.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
_, err = a1.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
}
},
},
{
name: "samples missing in superset",
err: errors.New("subset block has more samples than the superset block"),
sampleAppender: func(a1, a2 storage.Appender) {
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
if j == 0 || j == 9 || j%2 == 0 {
_, err := a2.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
_, err := a1.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
}
},
},
{
name: "samples missing in subset",
sampleAppender: func(a1, a2 storage.Appender) {
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
if j%2 == 0 {
_, err := a1.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
_, err := a2.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
}
},
},
{
name: "series missing in superset but having same number of series and samples",
err: errors.New("subset block has series that are not in the superset block"),
sampleAppender: func(a1, a2 storage.Appender) {
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
_, err := a1.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
if i != 5 {
_, err = a2.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
}
}
for j := 0; j < 10; j++ {
_, err := a2.Append(0, labels.FromStrings("foo", "barbar"), int64(j), float64(j))
require.NoError(t, err)
}
},
},
{
name: "series missing in subset",
sampleAppender: func(a1, a2 storage.Appender) {
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
if i%2 == 0 {
_, err := a1.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
_, err := a2.Append(0, labels.FromStrings("foo", "bar"+strconv.Itoa(i)), int64(j), float64(j))
require.NoError(t, err)
}
}
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b1Dir := t.TempDir()
b2Dir := t.TempDir()
w1, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), b1Dir, 2*time.Hour.Milliseconds())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, w1.Close())
})
w2, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), b2Dir, 2*time.Hour.Milliseconds())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, w2.Close())
})

a1 := w1.Appender(context.Background())
a2 := w2.Appender(context.Background())

c.sampleAppender(a1, a2)

require.NoError(t, a1.Commit())
require.NoError(t, a2.Commit())

b1ID, err := w1.Flush(context.Background())
require.NoError(t, err)
b2ID, err := w2.Flush(context.Background())
require.NoError(t, err)

bc := BlockSubsetCommand{
subsetBlock: path.Join(b1Dir, b1ID.String()),
supersetBlock: path.Join(b2Dir, b2ID.String()),
}

err = bc.checkSubset(nil)
if c.err == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, err.Error(), c.err.Error())
}
})
}
}
Loading