Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar: use prometheus metrics for min timestamp #7820

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch timestamps. Is Prometheus running? Retrying",
"err", err,
)
return err
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -266,16 +274,21 @@ func runSidecar(
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()
if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log("msg", "updating timestamps failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
return nil
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
level.Warn(logger).Log("msg", "updating labels failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
} else {
promUp.Set(1)
statusProber.Ready()
return nil
}

promUp.Set(1)
statusProber.Ready()
return nil
})
}, func(error) {
Expand Down Expand Up @@ -317,7 +330,7 @@ func runSidecar(
}),
info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) {
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
mint, maxt := m.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
Expand Down Expand Up @@ -409,13 +422,6 @@ func runSidecar(
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -490,16 +496,19 @@ func (s *promMetadata) UpdateLabels(ctx context.Context) error {
return nil
}

func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
func (s *promMetadata) UpdateTimestamps(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

if mint < s.limitMinTime.PrometheusTimestamp() {
mint = s.limitMinTime.PrometheusTimestamp()
mint, err := s.client.LowestTimestamp(ctx, s.promURL)
if err != nil {
return err
}

s.mint = mint
s.maxt = maxt
s.mint = min(s.limitMinTime.PrometheusTimestamp(), mint)
s.maxt = math.MaxInt64

return nil
}

func (s *promMetadata) Labels() labels.Labels {
Expand Down
44 changes: 44 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -687,6 +689,48 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error
return b.Data.Version, nil
}

// LowestTimestamp returns the lowest timestamp in the TSDB by parsing the /metrics endpoint
// and extracting the prometheus_tsdb_lowest_timestamp_seconds metric from it.
func (c *Client) LowestTimestamp(ctx context.Context, base *url.URL) (int64, error) {
u := *base
u.Path = path.Join(u.Path, "/metrics")
MichaHoffmann marked this conversation as resolved.
Show resolved Hide resolved

level.Debug(c.logger).Log("msg", "lowest timestamp", "url", u.String())

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return 0, errors.Wrap(err, "create request")
}

span, ctx := tracing.StartSpan(ctx, "/lowest_timestamp HTTP[client]")
defer span.Finish()

resp, err := c.Do(req.WithContext(ctx))
if err != nil {
return 0, errors.Wrapf(err, "request metric against %s", u.String())
}
defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "request body")

var parser expfmt.TextParser
families, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return 0, errors.Wrapf(err, "parsing metric families against %s", u.String())
}
mf, ok := families["prometheus_tsdb_lowest_timestamp_seconds"]
if !ok {
return 0, errors.Wrapf(err, "metric families did not contain 'prometheus_tsdb_lowest_timestamp_seconds'")
}
val := 1000 * mf.GetMetric()[0].GetGauge().GetValue()

// in the case that we dont have cut a block yet, TSDB lowest timestamp is math.MaxInt64
// but its represented as float and truncated so we need to do this weird comparison.
// Since we use this for fan-out pruning we use min timestamp here to include this prometheus.
if val == float64(math.MaxInt64) {
return math.MinInt64, nil
}
return int64(val), nil
}

func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package promclient
import (
"context"
"fmt"
"math"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -83,6 +84,20 @@ func TestConfiguredFlags_e2e(t *testing.T) {
})
}

func TestLowestTimestamp_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger()))

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

ts, err := NewDefaultClient().LowestTimestamp(context.Background(), u)
testutil.Ok(t, err)

testutil.Equals(t, math.MinInt64, int(ts))
})
}

func TestSnapshot_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
now := time.Now()
Expand Down
Loading