Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prepare-shutdown endpoint to ingesters for down scaling #4718

Merged
merged 5 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [FEATURE] Distributor, ingester, querier, query-frontend, store-gateway: add experimental support for native histograms. Requires that the experimental protobuf query result response format is enabled by `-query-frontend.query-result-response-format=protobuf` on the query frontend. #4286 #4352 #4354 #4376 #4377 #4387 #4396 #4425 #4442 #4494 #4512 #4513 #4526
* [FEATURE] Added `-<prefix>.s3.storage-class` flag to configure the S3 storage class for objects written to S3 buckets. #3438
* [FEATURE] Add `freebsd` to the target OS when generating binaries for a Mimir release. #4654
* [FEATURE] Ingester: Add `prepare-shutdown` endpoint which can be used as part of Kubernetes scale down automations. #4718
* [ENHANCEMENT] Add timezone information to Alpine Docker images. #4583
* [ENHANCEMENT] Ruler: Sync rules when ruler JOINING the ring instead of ACTIVE, In order to reducing missed rule iterations during ruler restarts. #4451
* [ENHANCEMENT] Allow to define service name used for tracing via `JAEGER_SERVICE_NAME` environment variable. #4394
Expand Down
18 changes: 18 additions & 0 deletions docs/sources/mimir/references/http-api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ This document groups API endpoints by service. Note that the API endpoints are e
| [Tenants stats](#tenants-stats) | Distributor | `GET /distributor/all_user_stats` |
| [HA tracker status](#ha-tracker-status) | Distributor | `GET /distributor/ha_tracker` |
| [Flush chunks / blocks](#flush-chunks--blocks) | Ingester | `GET,POST /ingester/flush` |
| [Prepare for Shutdown](#prepare-for-shutdown) | Ingester | `GET,POST /ingester/prepare-shutdown` |
56quarters marked this conversation as resolved.
Show resolved Hide resolved
| [Shutdown](#shutdown) | Ingester | `GET,POST /ingester/shutdown` |
| [Ingesters ring status](#ingesters-ring-status) | Distributor,Ingester | `GET /ingester/ring` |
| [Instant query](#instant-query) | Querier, Query-frontend | `GET,POST <prometheus-http-prefix>/api/v1/query` |
Expand Down Expand Up @@ -350,6 +351,23 @@ The flush endpoint also accepts a `wait=true` parameter, which makes the call sy

> **Note**: The returned status code does not reflect the result of flush operation.

### Prepare for Shutdown

```
GET,POST /ingester/prepare-shutdown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mention DELETE too and explain what different methods do on this endpoint.

```

This endpoint changes in-memory ingester configuration to prepare for permanently stopping an ingester instance
but does not actually stop any part of the ingester.

After the prepare-shutdown endpoint returns, when the ingester process is stopped with `SIGINT` / `SIGTERM`, the
56quarters marked this conversation as resolved.
Show resolved Hide resolved
ingester will be unregistered from the ring and in-memory time series data will be flushed to long-term storage.
This endpoint causes the ingester to be unregistered from the ring when stopped even if you disable
`-ingester.ring.unregister-on-shutdown`.

This API endpoint is usually used by Kubernetes-specific scale down automations such as the
[rollout-operator](https://github.com/grafana/rollout-operator).

### Shutdown

```
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ type Ingester interface {
client.IngesterServer
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
PrepareShutdownHandler(http.ResponseWriter, *http.Request)
PushWithCleanup(context.Context, *push.Request) (*mimirpb.WriteResponse, error)
UserRegistryHandler(http.ResponseWriter, *http.Request)
}
Expand All @@ -262,6 +263,7 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
})

a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, true, "GET", "POST")
a.RegisterRoute("/ingester/prepare-shutdown", http.HandlerFunc(i.PrepareShutdownHandler), false, true, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, true, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, i.PushWithCleanup), true, false, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/tsdb_metrics", http.HandlerFunc(i.UserRegistryHandler), true, true, "GET")
Expand Down
41 changes: 39 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -113,6 +114,8 @@ const (
// Value used to track the limit between sequential and concurrent TSDB opernings.
// Below this value, TSDBs of different tenants are opened sequentially, otherwise concurrently.
maxTSDBOpenWithoutConcurrency = 10

shutdownMarkerFile = "shutdown-requested.txt"
56quarters marked this conversation as resolved.
Show resolved Hide resolved
)

// BlocksUploader interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -253,6 +256,8 @@ type Ingester struct {
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64

shutdownMarker *ShutdownMarker

// Anonymous usage statistics tracked by ingester.
memorySeriesStats *expvar.Int
memoryTenantsStats *expvar.Int
Expand Down Expand Up @@ -333,6 +338,9 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u

i.shipperIngesterID = i.lifecycler.ID

// Shutdown marker is part of the process of gracefully scaling down ingesters
i.shutdownMarker = NewShutdownMarker(path.Join(cfg.BlocksStorageConfig.TSDB.Dir, shutdownMarkerFile))
56quarters marked this conversation as resolved.
Show resolved Hide resolved

// Apply positive jitter only to ensure that the minimum timeout is adhered to.
i.compactionIdleTimeout = util.DurationWithPositiveJitter(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout, compactionIdleTimeoutJitter)
level.Info(i.logger).Log("msg", "TSDB idle compaction timeout set", "timeout", i.compactionIdleTimeout)
Expand Down Expand Up @@ -407,7 +415,17 @@ func (i *Ingester) starting(ctx context.Context) error {
servs = append(servs, closeIdleService)
}

var err error
marker, err := i.shutdownMarker.Exists()
56quarters marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "failed to check ingester shutdown marker")
}

if marker {
level.Info(i.logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", i.shutdownMarker.Path)
i.lifecycler.SetUnregisterOnShutdown(true)
i.lifecycler.SetFlushOnShutdown(true)
56quarters marked this conversation as resolved.
Show resolved Hide resolved
}

i.subservices, err = services.NewManager(servs...)
if err == nil {
err = services.StartManagerAndAwaitHealthy(ctx, i.subservices)
Expand Down Expand Up @@ -2283,7 +2301,7 @@ func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
}

// This method will flush all data. It is called as part of Lifecycler's shutdown (if flush on shutdown is configured), or from the flusher.
// Flush will flush all data. It is called as part of Lifecycler's shutdown (if flush on shutdown is configured), or from the flusher.
//
// When called as during Lifecycler shutdown, this happens as part of normal Ingester shutdown (see stopping method).
// Samples are not received at this stage. Compaction and Shipping loops have already been stopped as well.
Expand Down Expand Up @@ -2440,6 +2458,25 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits {
return l
}

// PrepareShutdownHandler changes the configuration of the ingester such that when
// it is stopped, it will:
// - Change the state of ring to stop accepting writes.
// - Flush all the chunks to long-term storage.
//
// It also creates a file on disk which is used to re-apply the configuration if the
// ingester crashes and restarts before being permanently shutdown.
func (i *Ingester) PrepareShutdownHandler(w http.ResponseWriter, _ *http.Request) {
if err := i.shutdownMarker.Create(); err != nil {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
level.Error(i.logger).Log("msg", "unable to create prepare-shutdown marker file", "path", i.shutdownMarker.Path, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

i.lifecycler.SetUnregisterOnShutdown(true)
i.lifecycler.SetFlushOnShutdown(true)
w.WriteHeader(http.StatusNoContent)
}

// ShutdownHandler triggers the following set of operations in order:
// - Change the state of ring to stop accepting writes.
// - Flush all the chunks.
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/ingester_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ func (i *ActivityTrackerWrapper) FlushHandler(w http.ResponseWriter, r *http.Req
i.ing.FlushHandler(w, r)
}

func (i *ActivityTrackerWrapper) PrepareShutdownHandler(w http.ResponseWriter, r *http.Request) {
ix := i.tracker.Insert(func() string {
return requestActivity(r.Context(), "Ingester/PrepareShutdownHandler", nil)
})
defer i.tracker.Delete(ix)

i.ing.PrepareShutdownHandler(w, r)
}

func (i *ActivityTrackerWrapper) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
ix := i.tracker.Insert(func() string {
return requestActivity(r.Context(), "Ingester/ShutdownHandler", nil)
Expand Down
32 changes: 32 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3845,6 +3845,38 @@ func TestIngester_flushing(t *testing.T) {
},
},

"prepareShutdownHandler": {
setupIngester: func(cfg *Config) {
cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false
cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true
},

action: func(t *testing.T, i *Ingester, reg *prometheus.Registry) {
pushSingleSampleWithMetadata(t, i)

// Nothing shipped yet.
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks
# TYPE cortex_ingester_shipper_uploads_total counter
cortex_ingester_shipper_uploads_total 0
`), "cortex_ingester_shipper_uploads_total"))

// Preparing for shutdown shouldn't actually compact or ship anything.
i.PrepareShutdownHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/ingester/prepare-shutdown", nil))
verifyCompactedHead(t, i, false)

// Shutdown ingester. This triggers compaction and flushing of the block.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), i))
verifyCompactedHead(t, i, true)

require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
# HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks
# TYPE cortex_ingester_shipper_uploads_total counter
cortex_ingester_shipper_uploads_total 1
56quarters marked this conversation as resolved.
Show resolved Hide resolved
`), "cortex_ingester_shipper_uploads_total"))
},
},

"shutdownHandler": {
setupIngester: func(cfg *Config) {
cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false
Expand Down
100 changes: 100 additions & 0 deletions pkg/ingester/shutdown_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingester

import (
"fmt"
"os"
"path"
"time"
)

// ShutdownMarker is responsible for writing a marker file to disk to indicate
// that an ingester is going to be scaled down in the future. The presence of this
// file means that an ingester should flush and upload all data when stopping. This
// file is never cleaned up because the ingester pod will be deleted as part of
// scaling down.
type ShutdownMarker struct {
Path string
}
56quarters marked this conversation as resolved.
Show resolved Hide resolved

func NewShutdownMarker(path string) *ShutdownMarker {
return &ShutdownMarker{path}
}

func (m *ShutdownMarker) Create() error {
56quarters marked this conversation as resolved.
Show resolved Hide resolved
var (
dir *os.File
file *os.File
closeDir = true
closeFile = true

err error
)

defer func() {
if closeFile && file != nil {
_ = file.Close()
}

if closeDir && dir != nil {
_ = dir.Close()
}
}()

// Write the file, fsync it, then fsync the containing directory in order to guarantee
// it is persisted to disk. From https://man7.org/linux/man-pages/man2/fsync.2.html
//
// > Calling fsync() does not necessarily ensure that the entry in the
// > directory containing the file has also reached disk. For that an
// > explicit fsync() on a file descriptor for the directory is also
// > needed.

file, err = os.Create(m.Path)
if err != nil {
return fmt.Errorf("unable to create shutdown marker %s: %w", m.Path, err)
}

_, err = file.WriteString(time.Now().UTC().Format(time.RFC3339))
if err != nil {
return fmt.Errorf("unable to write shutdown marker contents %s: %w", m.Path, err)
}

if err = file.Sync(); err != nil {
return fmt.Errorf("unable to fsync shutdown marker %s: %w", m.Path, err)
}

closeFile = false
if err = file.Close(); err != nil {
return fmt.Errorf("unable to close shutdown marker %s: %w", m.Path, err)
}

dir, err = os.OpenFile(path.Dir(m.Path), os.O_RDONLY, 0777)
if err != nil {
return fmt.Errorf("unable to open shutdown marker directory %s: %w", m.Path, err)
}

if err = dir.Sync(); err != nil {
return fmt.Errorf("unable to fsync shutdown marker directory %s: %w", m.Path, err)
}

closeDir = false
if err = dir.Close(); err != nil {
return fmt.Errorf("unable to close shutdown marker directory %s: %w", m.Path, err)
}

return nil
56quarters marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *ShutdownMarker) Exists() (bool, error) {
s, err := os.Stat(m.Path)
if err != nil && os.IsNotExist(err) {
return false, nil
}

if err != nil {
return false, err
}

return s.Mode().IsRegular(), nil
}
77 changes: 77 additions & 0 deletions pkg/ingester/shutdown_marker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: AGPL-3.0-only

package ingester

import (
"os"
"path"
"testing"

"github.com/stretchr/testify/require"
)

func TestShutdownMarker_Create(t *testing.T) {
t.Run("directory does not exist", func(t *testing.T) {
m := NewShutdownMarker(path.Join(t.TempDir(), "does-not-exist", "marker.txt"))
require.Error(t, m.Create())
})

t.Run("file already exists as directory", func(t *testing.T) {
dir := path.Join(t.TempDir(), "exists", "marker.txt")

require.NoError(t, os.MkdirAll(dir, 0777))

m := NewShutdownMarker(dir)
require.Error(t, m.Create())
})

t.Run("file already exists", func(t *testing.T) {
dir := path.Join(t.TempDir(), "exists")
full := path.Join(dir, "marker.txt")

require.NoError(t, os.MkdirAll(dir, 0777))
require.NoError(t, os.WriteFile(full, []byte{}, 0666))

m := NewShutdownMarker(full)
require.NoError(t, m.Create())
})

t.Run("file does exist", func(t *testing.T) {
dir := path.Join(t.TempDir(), "exists")
full := path.Join(dir, "marker.txt")

require.NoError(t, os.MkdirAll(dir, 0777))

m := NewShutdownMarker(full)
require.NoError(t, m.Create())
})
}

func TestShutdownMarker_Exists(t *testing.T) {
t.Run("file exists", func(t *testing.T) {
dir := path.Join(t.TempDir(), "exists")
full := path.Join(dir, "marker.txt")

require.NoError(t, os.MkdirAll(dir, 0777))
require.NoError(t, os.WriteFile(full, []byte{}, 0400))

m := NewShutdownMarker(full)
exists, err := m.Exists()

require.True(t, exists)
require.NoError(t, err)
})

t.Run("file does not exist", func(t *testing.T) {
dir := path.Join(t.TempDir(), "exists")
full := path.Join(dir, "marker.txt")

require.NoError(t, os.MkdirAll(dir, 0777))

m := NewShutdownMarker(full)
exists, err := m.Exists()

require.False(t, exists)
require.NoError(t, err)
})
}