Skip to content

Commit

Permalink
Sidecar: use prometheus metrics for min timestamp
Browse files Browse the repository at this point in the history
Read "minT" from prometheus metrics so that we also set it for sidecars
that are not uploading blocks.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Oct 16, 2024
1 parent 832d17a commit c4d1de9
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 18 deletions.
45 changes: 27 additions & 18 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch timestamps. Is Prometheus running? Retrying",
"err", err,
)
return err
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -266,16 +274,21 @@ func runSidecar(
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()
if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log("msg", "updating timestamps failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
return nil
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
level.Warn(logger).Log("msg", "updating labels failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
} else {
promUp.Set(1)
statusProber.Ready()
return nil
}

promUp.Set(1)
statusProber.Ready()
return nil
})
}, func(error) {
Expand Down Expand Up @@ -317,7 +330,7 @@ func runSidecar(
}),
info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) {
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
mint, maxt := m.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
Expand Down Expand Up @@ -409,13 +422,6 @@ func runSidecar(
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -490,16 +496,19 @@ func (s *promMetadata) UpdateLabels(ctx context.Context) error {
return nil
}

func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
func (s *promMetadata) UpdateTimestamps(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

if mint < s.limitMinTime.PrometheusTimestamp() {
mint = s.limitMinTime.PrometheusTimestamp()
mint, err := s.client.LowestTimestamp(ctx, s.promURL)
if err != nil {
return err
}

s.mint = mint
s.maxt = maxt
s.mint = min(s.limitMinTime.PrometheusTimestamp(), mint)
s.maxt = math.MaxInt64

return nil
}

func (s *promMetadata) Labels() labels.Labels {
Expand Down
44 changes: 44 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -687,6 +689,48 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error
return b.Data.Version, nil
}

// LowestTimestamp returns the lowest timestamp in the TSDB by parsing the /metrics endpoint
// and extracting the prometheus_tsdb_lowest_timestamp_seconds metric from it.
func (c *Client) LowestTimestamp(ctx context.Context, base *url.URL) (int64, error) {
u := *base
u.Path = path.Join(u.Path, "/metrics")

level.Debug(c.logger).Log("msg", "lowest timestamp", "url", u.String())

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return 0, errors.Wrap(err, "create request")
}

span, ctx := tracing.StartSpan(ctx, "/lowest_timestamp HTTP[client]")
defer span.Finish()

resp, err := c.Do(req.WithContext(ctx))
if err != nil {
return 0, errors.Wrapf(err, "request metric against %s", u.String())
}
defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "request body")

var parser expfmt.TextParser
families, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return 0, errors.Wrapf(err, "parsing metric families against %s", u.String())
}
mf, ok := families["prometheus_tsdb_lowest_timestamp_seconds"]
if !ok {
return 0, errors.Wrapf(err, "metric families did not contain 'prometheus_tsdb_lowest_timestamp_seconds'")
}
val := 1000 * mf.GetMetric()[0].GetGauge().GetValue()

// in the case that we dont have cut a block yet, TSDB lowest timestamp is math.MaxInt64
// but its represented as float and truncated so we need to do this weird comparison.
// Since we use this for fan-out pruning we use min timestamp here to include this prometheus.
if val == float64(math.MaxInt64) {
return math.MinInt64, nil
}
return int64(val), nil
}

func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package promclient
import (
"context"
"fmt"
"math"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -83,6 +84,20 @@ func TestConfiguredFlags_e2e(t *testing.T) {
})
}

func TestLowestTimestamp_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger()))

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

ts, err := NewDefaultClient().LowestTimestamp(context.Background(), u)
testutil.Ok(t, err)

testutil.Equals(t, math.MinInt64, int(ts))
})
}

func TestSnapshot_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
now := time.Now()
Expand Down

0 comments on commit c4d1de9

Please sign in to comment.