Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the maximum size of block uploads #4680

Merged
merged 8 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] Store-gateway: reduce memory usage in some LabelValues calls. #4789
* [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797 #4830
* [ENHANCEMENT] Distributor: make `__meta_tenant_id` label available in relabeling rules configured via `metric_relabel_configs`. #4725
* [ENHANCEMENT] Compactor: added the configurable limit `compactor.block-upload-max-block-size-bytes` or `compactor_block_upload_max_block_size_bytes` to limit the byte size of uploaded or validated blocks. #4680
* [ENHANCEMENT] Querier: reduce CPU utilisation when shuffle sharding is enabled with large shard sizes. #4851
* [ENHANCEMENT] Packaging: facilitate configuration management by instructing systemd to start mimir with a configuration file. #4810
* [ENHANCEMENT] Store-gateway: reduce memory allocations when looking up postings from cache. #4861 #4869
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3497,6 +3497,17 @@
"fieldFlag": "compactor.block-upload-verify-chunks",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "compactor_block_upload_max_block_size_bytes",
"required": false,
"desc": "Maximum size in bytes of a block that is allowed to be uploaded or validated. 0 = no limit.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "compactor.block-upload-max-block-size-bytes",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "s3_sse_type",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ Usage of ./cmd/mimir/mimir:
Number of Go routines to use when downloading blocks for compaction and uploading resulting blocks. (default 8)
-compactor.block-upload-enabled
Enable block upload API for the tenant.
-compactor.block-upload-max-block-size-bytes int
Maximum size in bytes of a block that is allowed to be uploaded or validated. 0 = no limit.
-compactor.block-upload-validation-enabled
Enable block upload validation for the tenant. (default true)
-compactor.block-upload-verify-chunks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2868,6 +2868,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -compactor.block-upload-verify-chunks
[compactor_block_upload_verify_chunks: <boolean> | default = true]

# (advanced) Maximum size in bytes of a block that is allowed to be uploaded or
# validated. 0 = no limit.
# CLI flag: -compactor.block-upload-max-block-size-bytes
[compactor_block_upload_max_block_size_bytes: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
47 changes: 38 additions & 9 deletions pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
maximumMetaSizeBytes = 1 * 1024 * 1024 // 1 MiB, maximum allowed size of an uploaded block's meta.json file
)

var maxBlockUploadSizeBytesFormat = "block exceeds the maximum block size limit of %d bytes"
var rePath = regexp.MustCompile(`^(index|chunks/\d{6})$`)

// StartBlockUpload handles request for starting block upload.
Expand Down Expand Up @@ -152,7 +153,7 @@ func (c *MultitenantCompactor) FinishBlockUpload(w http.ResponseWriter, r *http.
decreaseActiveValidationsInDefer = false
go c.validateAndCompleteBlockUpload(logger, userBkt, blockID, m, func(ctx context.Context) error {
defer c.blockUploadValidations.Dec()
return c.validateBlock(ctx, blockID, userBkt, tenantID)
return c.validateBlock(ctx, logger, blockID, m, userBkt, tenantID)
})
} else {
if err := c.markBlockComplete(ctx, logger, userBkt, blockID, m); err != nil {
Expand Down Expand Up @@ -204,7 +205,7 @@ func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, meta *meta
logger log.Logger, userBkt objstore.Bucket, tenantID string, blockID ulid.ULID) error {
level.Debug(logger).Log("msg", "starting block upload")

if msg := c.sanitizeMeta(logger, blockID, meta); msg != "" {
if msg := c.sanitizeMeta(logger, tenantID, blockID, meta); msg != "" {
return httpError{
message: msg,
statusCode: http.StatusBadRequest,
Expand Down Expand Up @@ -373,7 +374,7 @@ func (c *MultitenantCompactor) markBlockComplete(ctx context.Context, logger log

// sanitizeMeta sanitizes and validates a metadata.Meta object. If a validation error occurs, an error
// message gets returned, otherwise an empty string.
func (c *MultitenantCompactor) sanitizeMeta(logger log.Logger, blockID ulid.ULID, meta *metadata.Meta) string {
func (c *MultitenantCompactor) sanitizeMeta(logger log.Logger, userID string, blockID ulid.ULID, meta *metadata.Meta) string {
if meta == nil {
return "missing block metadata"
}
Expand Down Expand Up @@ -426,6 +427,10 @@ func (c *MultitenantCompactor) sanitizeMeta(logger log.Logger, blockID ulid.ULID
}
}

if err := c.validateMaximumBlockSize(logger, meta.Thanos.Files, userID); err != nil {
return err.Error()
}

if meta.Version != metadata.TSDBVersion1 {
return fmt.Sprintf("version must be %d", metadata.TSDBVersion1)
}
Expand Down Expand Up @@ -509,17 +514,16 @@ func (c *MultitenantCompactor) prepareBlockForValidation(ctx context.Context, us
return blockDir, nil
}

func (c *MultitenantCompactor) validateBlock(ctx context.Context, blockID ulid.ULID, userBkt objstore.Bucket, userID string) error {
blockDir, err := c.prepareBlockForValidation(ctx, userBkt, blockID)
if err != nil {
func (c *MultitenantCompactor) validateBlock(ctx context.Context, logger log.Logger, blockID ulid.ULID, blockMetadata *metadata.Meta, userBkt objstore.Bucket, userID string) error {
if err := c.validateMaximumBlockSize(logger, blockMetadata.Thanos.Files, userID); err != nil {
return err
}
defer c.removeTemporaryBlockDirectory(blockDir)

blockMetadata, err := metadata.ReadFromDir(blockDir)
blockDir, err := c.prepareBlockForValidation(ctx, userBkt, blockID)
if err != nil {
return errors.Wrap(err, "error reading block metadata file")
return err
}
defer c.removeTemporaryBlockDirectory(blockDir)

// check that all files listed in the metadata are present and the correct size
for _, f := range blockMetadata.Thanos.Files {
Expand Down Expand Up @@ -547,6 +551,31 @@ func (c *MultitenantCompactor) validateBlock(ctx context.Context, blockID ulid.U
return nil
}

func (c *MultitenantCompactor) validateMaximumBlockSize(logger log.Logger, files []metadata.File, userID string) error {
maxBlockSizeBytes := c.cfgProvider.CompactorBlockUploadMaxBlockSizeBytes(userID)
if maxBlockSizeBytes <= 0 {
return nil
}
andyasp marked this conversation as resolved.
Show resolved Hide resolved

blockSizeBytes := int64(0)
for _, f := range files {
if f.SizeBytes < 0 {
return errors.New("invalid negative file size in block metadata")
}
blockSizeBytes += f.SizeBytes
if blockSizeBytes < 0 {
// overflow
break
}
}

if blockSizeBytes > maxBlockSizeBytes || blockSizeBytes < 0 {
level.Error(logger).Log("msg", "rejecting block upload for exceeding maximum size", "limit", maxBlockSizeBytes, "size", blockSizeBytes)
return fmt.Errorf(maxBlockUploadSizeBytesFormat, maxBlockSizeBytes)
}
return nil
}

type httpError struct {
message string
statusCode int
Expand Down
121 changes: 106 additions & 15 deletions pkg/compactor/block_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -111,20 +112,21 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) {
}

testCases := []struct {
name string
tenantID string
blockID string
body string
meta *metadata.Meta
retention time.Duration
disableBlockUpload bool
expBadRequest string
expConflict string
expUnprocessableEntity string
expEntityTooLarge string
expInternalServerError bool
setUpBucketMock func(bkt *bucket.ClientMock)
verifyUpload func(*testing.T, *bucket.ClientMock)
name string
tenantID string
blockID string
body string
meta *metadata.Meta
retention time.Duration
disableBlockUpload bool
expBadRequest string
expConflict string
expUnprocessableEntity string
expEntityTooLarge string
expInternalServerError bool
setUpBucketMock func(bkt *bucket.ClientMock)
verifyUpload func(*testing.T, *bucket.ClientMock)
maxBlockUploadSizeBytes int64
}{
{
name: "missing tenant ID",
Expand Down Expand Up @@ -427,6 +429,15 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) {
disableBlockUpload: true,
expBadRequest: "block upload is disabled",
},
{
name: "max block size exceeded",
tenantID: tenantID,
blockID: blockID,
setUpBucketMock: setUpPartialBlock,
meta: &validMeta,
maxBlockUploadSizeBytes: 1,
expBadRequest: fmt.Sprintf(maxBlockUploadSizeBytesFormat, 1),
},
{
name: "valid request",
tenantID: tenantID,
Expand Down Expand Up @@ -549,6 +560,7 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) {
cfgProvider := newMockConfigProvider()
cfgProvider.userRetentionPeriods[tenantID] = tc.retention
cfgProvider.blockUploadEnabled[tenantID] = !tc.disableBlockUpload
cfgProvider.blockUploadMaxBlockSizeBytes[tenantID] = tc.maxBlockUploadSizeBytes
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: &bkt,
Expand Down Expand Up @@ -1478,6 +1490,7 @@ func TestMultitenantCompactor_ValidateBlock(t *testing.T) {
indexInject func(fname string)
chunkInject func(fname string)
populateFileList bool
maximumBlockSize int64
verifyChunks bool
missing Missing
expectError bool
Expand All @@ -1488,6 +1501,14 @@ func TestMultitenantCompactor_ValidateBlock(t *testing.T) {
lbls: validLabels,
populateFileList: true,
},
{
name: "maximum block size exceeded",
lbls: validLabels,
populateFileList: true,
maximumBlockSize: 1,
expectError: true,
expectedMsg: fmt.Sprintf(maxBlockUploadSizeBytesFormat, 1),
},
{
name: "missing meta file",
lbls: validLabels,
Expand Down Expand Up @@ -1605,6 +1626,7 @@ func TestMultitenantCompactor_ValidateBlock(t *testing.T) {
cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadValidationEnabled[tenantID] = true
cfgProvider.verifyChunks[tenantID] = tc.verifyChunks
cfgProvider.blockUploadMaxBlockSizeBytes[tenantID] = tc.maximumBlockSize
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: bkt,
Expand Down Expand Up @@ -1662,7 +1684,7 @@ func TestMultitenantCompactor_ValidateBlock(t *testing.T) {
}

// validate the block
err = c.validateBlock(ctx, blockID, bkt, tenantID)
err = c.validateBlock(ctx, c.logger, blockID, meta, bkt, tenantID)
if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedMsg)
Expand Down Expand Up @@ -1869,6 +1891,75 @@ func TestMultitenantCompactor_GetBlockUploadStateHandler(t *testing.T) {
}
}

func TestMultitenantCompactor_ValidateMaximumBlockSize(t *testing.T) {
const userID = "user"

type testCase struct {
maximumBlockSize int64
fileSizes []int64
expectErr bool
}

for name, tc := range map[string]testCase{
"no limit": {
maximumBlockSize: 0,
fileSizes: []int64{math.MaxInt64},
expectErr: false,
},
"under limit": {
maximumBlockSize: 4,
fileSizes: []int64{1, 2},
expectErr: false,
},
"under limit - zero size file included": {
maximumBlockSize: 2,
fileSizes: []int64{1, 0},
expectErr: false,
},
"under limit - negative size file included": {
maximumBlockSize: 2,
fileSizes: []int64{2, -1},
expectErr: true,
},
"exact limit": {
maximumBlockSize: 3,
fileSizes: []int64{1, 2},
expectErr: false,
},
"over limit": {
maximumBlockSize: 1,
fileSizes: []int64{1, 1},
expectErr: true,
},
"overflow": {
maximumBlockSize: math.MaxInt64,
fileSizes: []int64{math.MaxInt64, math.MaxInt64, math.MaxInt64},
expectErr: true,
},
} {
t.Run(name, func(t *testing.T) {
files := make([]metadata.File, len(tc.fileSizes))
for i, size := range tc.fileSizes {
files[i] = metadata.File{SizeBytes: size}
}

cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadMaxBlockSizeBytes[userID] = tc.maximumBlockSize
c := &MultitenantCompactor{
logger: log.NewNopLogger(),
cfgProvider: cfgProvider,
}

err := c.validateMaximumBlockSize(c.logger, files, userID)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

// marshalAndUploadJSON is a test helper for uploading a meta file to a certain path in a bucket.
func marshalAndUploadJSON(t *testing.T, bkt objstore.Bucket, pth string, val interface{}) {
t.Helper()
Expand Down
6 changes: 6 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ type mockConfigProvider struct {
splitGroups map[string]int
blockUploadEnabled map[string]bool
blockUploadValidationEnabled map[string]bool
blockUploadMaxBlockSizeBytes map[string]int64
userPartialBlockDelay map[string]time.Duration
userPartialBlockDelayInvalid map[string]bool
verifyChunks map[string]bool
Expand All @@ -985,6 +986,7 @@ func newMockConfigProvider() *mockConfigProvider {
splitGroups: make(map[string]int),
blockUploadEnabled: make(map[string]bool),
blockUploadValidationEnabled: make(map[string]bool),
blockUploadMaxBlockSizeBytes: make(map[string]int64),
userPartialBlockDelay: make(map[string]time.Duration),
userPartialBlockDelayInvalid: make(map[string]bool),
verifyChunks: make(map[string]bool),
Expand Down Expand Up @@ -1035,6 +1037,10 @@ func (m *mockConfigProvider) CompactorBlockUploadVerifyChunks(tenantID string) b
return m.verifyChunks[tenantID]
}

func (m *mockConfigProvider) CompactorBlockUploadMaxBlockSizeBytes(user string) int64 {
return m.blockUploadMaxBlockSizeBytes[user]
}

func (m *mockConfigProvider) S3SSEType(user string) string {
return ""
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ type ConfigProvider interface {

// CompactorBlockUploadVerifyChunks returns whether chunk verification is enabled for a given tenant.
CompactorBlockUploadVerifyChunks(tenantID string) bool

// CompactorBlockUploadMaxBlockSizeBytes returns the maximum size in bytes of a block that is allowed to be uploaded or validated for a given user.
CompactorBlockUploadMaxBlockSizeBytes(userID string) int64
}

// MultitenantCompactor is a multi-tenant TSDB blocks compactor based on Thanos.
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Limits struct {
CompactorBlockUploadEnabled bool `yaml:"compactor_block_upload_enabled" json:"compactor_block_upload_enabled"`
CompactorBlockUploadValidationEnabled bool `yaml:"compactor_block_upload_validation_enabled" json:"compactor_block_upload_validation_enabled"`
CompactorBlockUploadVerifyChunks bool `yaml:"compactor_block_upload_verify_chunks" json:"compactor_block_upload_verify_chunks"`
CompactorBlockUploadMaxBlockSizeBytes int64 `yaml:"compactor_block_upload_max_block_size_bytes" json:"compactor_block_upload_max_block_size_bytes" category:"advanced"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -255,6 +256,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.CompactorBlockUploadEnabled, "compactor.block-upload-enabled", false, "Enable block upload API for the tenant.")
f.BoolVar(&l.CompactorBlockUploadValidationEnabled, "compactor.block-upload-validation-enabled", true, "Enable block upload validation for the tenant.")
f.BoolVar(&l.CompactorBlockUploadVerifyChunks, "compactor.block-upload-verify-chunks", true, "Verify chunks when uploading blocks via the upload API for the tenant.")
f.Int64Var(&l.CompactorBlockUploadMaxBlockSizeBytes, "compactor.block-upload-max-block-size-bytes", 0, "Maximum size in bytes of a block that is allowed to be uploaded or validated. 0 = no limit.")

// Query-frontend.
f.Var(&l.MaxTotalQueryLength, maxTotalQueryLengthFlag, "Limit the total query time range (end - start time). This limit is enforced in the query-frontend on the received query.")
Expand Down Expand Up @@ -663,6 +665,11 @@ func (o *Overrides) CompactorBlockUploadVerifyChunks(tenantID string) bool {
return o.getOverridesForUser(tenantID).CompactorBlockUploadVerifyChunks
}

// CompactorBlockUploadMaxBlockSizeBytes returns the maximum size in bytes of a block that is allowed to be uploaded or validated for a given user.
func (o *Overrides) CompactorBlockUploadMaxBlockSizeBytes(userID string) int64 {
return o.getOverridesForUser(userID).CompactorBlockUploadMaxBlockSizeBytes
}

// MetricRelabelConfigs returns the metric relabel configs for a given user.
func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config {
return o.getOverridesForUser(userID).MetricRelabelConfigs
Expand Down