Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making the number of CPUs used for WAL replay configurable #4445

Merged
merged 6 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* `cortex_bucket_store_series_get_all_duration_seconds`
* `cortex_bucket_store_series_merge_duration_seconds`
* [CHANGE] Ingester: changed default value of `-blocks-storage.tsdb.retention-period` from `24h` to `13h`. If you're running Mimir with a custom configuration and you're overriding `-querier.query-store-after` to a value greater than the default `12h` then you should increase `-blocks-storage.tsdb.retention-period` accordingly. #4382
* [CHANGE] Ingester: the configuration parameter `-blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup` has been deprecated and will be removed in Mimir 2.10.
* [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371
* [ENHANCEMENT] Allow to define service name used for tracing via `JAEGER_SERVICE_NAME` environment variable. #4394
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4304 #4318 #4375
Expand All @@ -19,6 +20,7 @@
* [ENHANCEMENT] Querier and store-gateway: optimized `.*` and `.+` regular expression label matchers. #4432
* [ENHANCEMENT] Query-frontend: results cache TTL is now configurable by using `-query-frontend.results-cache-ttl` and `-query-frontend.results-cache-ttl-for-out-of-order-time-window` options. These values can also be specified per tenant. Default values are unchanged (7 days and 10 minutes respectively). #4385
* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423
* [ENHANCEMENT] Ingester: added advanced parameter `-blocks-storage.tsdb.wal-replay-concurrency` representing the maximum number of CPUs used during WAL replay.
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved

### Mixin

Expand Down
13 changes: 12 additions & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -7089,6 +7089,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "wal_replay_concurrency",
"required": false,
"desc": "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.",
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.tsdb.wal-replay-concurrency",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "flush_blocks_on_shutdown",
Expand Down Expand Up @@ -7153,7 +7164,7 @@
"fieldDefaultValue": 10,
"fieldFlag": "blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup",
"fieldType": "int",
"fieldCategory": "advanced"
"fieldCategory": "deprecated"
},
{
"kind": "field",
Expand Down
4 changes: 3 additions & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ Usage of ./cmd/mimir/mimir:
-blocks-storage.tsdb.head-postings-for-matchers-cache-ttl duration
[experimental] How long to cache postings for matchers in the Head and OOOHead. 0 disables the cache and just deduplicates the in-flight calls. (default 10s)
-blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup int
limit the number of concurrently opening TSDB's on startup (default 10)
[deprecated] limit the number of concurrently opening TSDB's on startup (default 10)
-blocks-storage.tsdb.memory-snapshot-on-shutdown
[experimental] True to enable snapshotting of in-memory TSDB data on disk when shutting down.
-blocks-storage.tsdb.out-of-order-capacity-max int
Expand All @@ -679,6 +679,8 @@ Usage of ./cmd/mimir/mimir:
The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance. (default 16384)
-blocks-storage.tsdb.wal-compression-enabled
True to enable TSDB WAL compression.
-blocks-storage.tsdb.wal-replay-concurrency int
Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.
-blocks-storage.tsdb.wal-segment-size-bytes int
TSDB WAL segments files max size (bytes). (default 134217728)
-common.storage.azure.account-key string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3267,6 +3267,11 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.wal-segment-size-bytes
[wal_segment_size_bytes: <int> | default = 134217728]

# (advanced) Maximum number of CPUs that can simultaneously processes WAL
# replay. 0 means disabled.
# CLI flag: -blocks-storage.tsdb.wal-replay-concurrency
[wal_replay_concurrency: <int> | default = 0]

# (advanced) True to flush blocks to storage on shutdown. If false, incomplete
# blocks will be reused after restart.
# CLI flag: -blocks-storage.tsdb.flush-blocks-on-shutdown
Expand Down Expand Up @@ -3299,7 +3304,7 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.series-hash-cache-max-size-bytes
[series_hash_cache_max_size_bytes: <int> | default = 1073741824]

# (advanced) limit the number of concurrently opening TSDB's on startup
# (deprecated) limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20230119144549-0aaa5abd1e63
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.2.0
go.uber.org/goleak v1.2.1
golang.org/x/crypto v0.3.0
golang.org/x/net v0.7.0
golang.org/x/sync v0.1.0
Expand Down Expand Up @@ -88,7 +88,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go v1.44.187 // indirect
github.com/aws/aws-sdk-go v1.44.207 // indirect
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.15.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.11.0 // indirect
Expand Down Expand Up @@ -228,7 +228,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20230309083841-242e82b8e667
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20230309145355-024edcdda34c

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.44.187 h1:D5CsRomPnlwDHJCanL2mtaLIcbhjiWxNh5j8zvaWdJA=
github.com/aws/aws-sdk-go v1.44.187/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.44.207 h1:7O0AMKxTm+/GUx6zw+3dqc+fD3tTzv8xaZPYo+ywRwE=
github.com/aws/aws-sdk-go v1.44.207/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.16.0 h1:cBAYjiiexRAg9v2z9vb6IdxAa7ef4KCtjW7w7e3GxGo=
github.com/aws/aws-sdk-go-v2 v1.16.0/go.mod h1:lJYcuZZEHWNIb6ugJjbQY1fykdoobWbOS7kJYb4APoI=
github.com/aws/aws-sdk-go-v2/config v1.15.1 h1:hTIZFepYESYyowQUBo47lu69WSxsYqGUILY9Nu8+7pY=
Expand Down Expand Up @@ -505,8 +505,8 @@ github.com/grafana/gomemcache v0.0.0-20230221082510-6cde04bf2270 h1:cj3uiNKskh+/
github.com/grafana/gomemcache v0.0.0-20230221082510-6cde04bf2270/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20230309083841-242e82b8e667 h1:fsY1JTaeHdpU2PZKrw6U5jazlaCoG8CE4As6Q0268LI=
github.com/grafana/mimir-prometheus v0.0.0-20230309083841-242e82b8e667/go.mod h1:XHIzUaYXL352XOhSF/R0eZ+/k2x6u5de/d/X9VWwVnI=
github.com/grafana/mimir-prometheus v0.0.0-20230309145355-024edcdda34c h1:4iR9RM+tvyhPWQJ5ct7x0cP1DeuAyJt4yh9dEYPyGDk=
github.com/grafana/mimir-prometheus v0.0.0-20230309145355-024edcdda34c/go.mod h1:eNd62DoOh4+xRKDW2mK5qb8RAKMSsOpw96ZyUqdlR4E=
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM=
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
Expand Down Expand Up @@ -962,8 +962,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
Expand Down Expand Up @@ -1018,7 +1018,6 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand Down
149 changes: 98 additions & 51 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ const (
tenantsWithOutOfOrderEnabledStatName = "ingester_ooo_enabled_tenants"
minOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_min_window"
maxOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_max_window"

// maximum number of TSDBs present on the file system which can be opened in a single process without walReplayConcurrency
maxTSDBOpenWithoutConcurrency = 10
)

// BlocksUploader interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -1690,8 +1693,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
return db, nil
}

type tsdbOption func(*tsdb.Options)

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
func (i *Ingester) createTSDB(userID string, additionalTsdbOptions ...tsdbOption) (*userTSDB, error) {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
tsdbPromReg := prometheus.NewRegistry()
udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID)
userLogger := util_log.WithUserID(userID, i.logger)
Expand All @@ -1712,8 +1717,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {

maxExemplars := i.limiter.convertGlobalToLocalLimit(userID, i.limits.MaxGlobalExemplarsPerUser(userID))
oooTW := i.limits.OutOfOrderTimeWindow(userID)
// Create a new user database
db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{
tsdbOptions := &tsdb.Options{
RetentionDuration: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(),
MinBlockDuration: blockRanges[0],
MaxBlockDuration: blockRanges[len(blockRanges)-1],
Expand All @@ -1738,7 +1742,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
HeadPostingsForMatchersCacheSize: i.cfg.BlocksStorageConfig.TSDB.HeadPostingsForMatchersCacheSize,
HeadPostingsForMatchersCacheForce: i.cfg.BlocksStorageConfig.TSDB.HeadPostingsForMatchersCacheForce,
EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID),
}, nil)
}
for _, tsdbOption := range additionalTsdbOptions {
tsdbOption(tsdbOptions)
}
// Create a new user database
db, err := tsdb.Open(udir, userLogger, tsdbPromReg, tsdbOptions, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
}
Expand Down Expand Up @@ -1833,13 +1842,42 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
queue := make(chan string)
group, groupCtx := errgroup.WithContext(ctx)

userIDs, err := i.getAllUsersWithTSDB()
if err != nil {
level.Error(i.logger).Log("msg", "error while finding existing TSDBs", "err", err)
return err
}

if len(userIDs) == 0 {
return nil
}

var concurrentOpenTSDBCount = i.cfg.BlocksStorageConfig.TSDB.DeprecatedMaxTSDBOpeningConcurrencyOnStartup
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
var walReplayConcurrency = 0
// If TSDBConfig.WALReplayConcurrency is set to a positive value, we honor it and ignore value of
// TSDB.DeprecatedMaxTSDBOpeningConcurrencyOnStartup, being the latter deprecated.
// If TSDBConfig.WALReplayConcurrency is 0, it is ignored, and TSDB.DeprecatedMaxTSDBOpeningConcurrencyOnStartup
// determines the number of concurrent processes opening TSDBs.
if i.cfg.BlocksStorageConfig.TSDB.WALReplayConcurrency > 0 {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
if len(userIDs) <= maxTSDBOpenWithoutConcurrency {
concurrentOpenTSDBCount = 1
walReplayConcurrency = i.cfg.BlocksStorageConfig.TSDB.WALReplayConcurrency
} else {
concurrentOpenTSDBCount = i.cfg.BlocksStorageConfig.TSDB.WALReplayConcurrency
walReplayConcurrency = 1
}
}
walReplayConcurrencyOption := func(tsdbOptions *tsdb.Options) {
tsdbOptions.WALReplayConcurrency = walReplayConcurrency
}

// Create a pool of workers which will open existing TSDBs.
for n := 0; n < i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup; n++ {
for n := 0; n < concurrentOpenTSDBCount; n++ {
group.Go(func() error {
for userID := range queue {
startTime := time.Now()

db, err := i.createTSDB(userID)
db, err := i.createTSDB(userID, walReplayConcurrencyOption)
if err != nil {
level.Error(i.logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return errors.Wrapf(err, "unable to open TSDB for user %s", userID)
Expand All @@ -1858,46 +1896,11 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
})
}

// Spawn a goroutine to find all users with a TSDB on the filesystem.
// Spawn a goroutine to place on the queue all users with a TSDB found on the filesystem.
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
group.Go(func() error {
// Close the queue once filesystem walking is done.
defer close(queue)

walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
return filepath.SkipDir
}

level.Error(i.logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
}

// Skip root dir and all other files
if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() {
return nil
}

// Top level directories are assumed to be user TSDBs
userID := info.Name()
f, err := os.Open(path)
if err != nil {
level.Error(i.logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
}
defer f.Close()

// If the dir is empty skip it
if _, err := f.Readdirnames(1); err != nil {
if errors.Is(err, io.EOF) {
return filepath.SkipDir
}

level.Error(i.logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
}

for userID := range userIDs {
// Enqueue the user to be processed.
select {
case queue <- userID:
Expand All @@ -1906,16 +1909,12 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
// Interrupt in case a failure occurred in another goroutine.
return nil
}

// Don't descend into subdirectories.
return filepath.SkipDir
})

return errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir)
}
return nil
})

// Wait for all workers to complete.
err := group.Wait()
err = group.Wait()
if err != nil {
level.Error(i.logger).Log("msg", "error while opening existing TSDBs", "err", err)
return err
Expand All @@ -1928,6 +1927,54 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
return nil
}

// getAllUsersWithTSDB finds all users with a TSDB on the filesystem.
func (i *Ingester) getAllUsersWithTSDB() (map[string]struct{}, error) {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
userIDs := make(map[string]struct{})
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
return filepath.SkipDir
}

level.Error(i.logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
}

// Skip root dir and all other files
if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() {
return nil
}

// Top level directories are assumed to be user TSDBs
userID := info.Name()
f, err := os.Open(path)
if err != nil {
level.Error(i.logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
}
defer f.Close()

// If the dir is empty skip it
if _, err := f.Readdirnames(1); err != nil {
if errors.Is(err, io.EOF) {
return filepath.SkipDir
}

level.Error(i.logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
}

// Save userId.
userIDs[userID] = struct{}{}

// Don't descend into subdirectories.
return filepath.SkipDir
})

return userIDs, errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir)
}

// getOldestUnshippedBlockMetric returns the unix timestamp of the oldest unshipped block or
// 0 if all blocks have been shipped.
func (i *Ingester) getOldestUnshippedBlockMetric() float64 {
Expand Down Expand Up @@ -2195,7 +2242,7 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
userDB.casState(closing, closed)

// Only remove user from TSDBState when everything is cleaned up
// This will prevent concurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
// This will prevent walReplayConcurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
// came in - while closing the tsdb for the same tenant.
// If this happens now, the request will get reject as the push will not be able to acquire the lock as the tsdb will be
// in closed state
Expand Down
Loading