Skip to content

Commit

Permalink
kafka replay speed: remove records-per-fetch configuration (#9906)
Browse files Browse the repository at this point in the history
* Remove startup records per fetch

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove ongoing records per fetch

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update docker-compose

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update comments and test names

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename used field

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Fix test

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Rename variables

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored Nov 15, 2024
1 parent 7328a9e commit 6ca9c52
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 204 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647
* [CHANGE] Ingester: Change `-initial-delay` for circuit breakers to begin when the first request is received, rather than at breaker activation. #9842
* [CHANGE] Query-frontend: apply query pruning before query sharding instead of after. #9913
* [CHANGE] Ingester: remove experimental flags `-ingest-storage.kafka.ongoing-records-per-fetch` and `-ingest-storage.kafka.startup-records-per-fetch`. They are removed in favour of `-ingest-storage.kafka.max-buffered-bytes`. #9906
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
Expand Down
20 changes: 0 additions & 20 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6713,16 +6713,6 @@
"fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "startup_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 2500,
"fieldFlag": "ingest-storage.kafka.startup-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_fetch_concurrency",
Expand All @@ -6733,16 +6723,6 @@
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 30,
"fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "use_compressed_bytes_as_fetch_max_bytes",
Expand Down
4 changes: 0 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1529,8 +1529,6 @@ Usage of ./cmd/mimir/mimir:
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
Expand All @@ -1541,8 +1539,6 @@ Usage of ./cmd/mimir/mimir:
The username used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
4 changes: 0 additions & 4 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,6 @@ Usage of ./cmd/mimir/mimir:
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to disable.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
Expand All @@ -445,8 +443,6 @@ Usage of ./cmd/mimir/mimir:
The username used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
2 changes: 0 additions & 2 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ std.manifestYamlDoc({
'-ingest-storage.kafka.ingestion-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
'-ingest-storage.kafka.startup-fetch-concurrency=15',
'-ingest-storage.kafka.startup-records-per-fetch=2400',
'-ingest-storage.kafka.ongoing-fetch-concurrency=2',
'-ingest-storage.kafka.ongoing-records-per-fetch=30',
],
extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'],
}),
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.startup-records-per-fetch=2400 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ongoing-records-per-fetch=30"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
12 changes: 0 additions & 12 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3898,25 +3898,13 @@ kafka:
# CLI flag: -ingest-storage.kafka.startup-fetch-concurrency
[startup_fetch_concurrency: <int> | default = 0]
# The number of records per fetch request that the ingester makes when reading
# data from Kafka during startup. Depends on
# ingest-storage.kafka.startup-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.startup-records-per-fetch
[startup_records_per_fetch: <int> | default = 2500]
# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. 0 to
# disable.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]
# The number of records per fetch request that the ingester makes when reading
# data continuously from Kafka after startup. Depends on
# ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch
[ongoing_records_per_fetch: <int> | default = 30]
# When enabled, the fetch request MaxBytes field is computed using the
# compressed size of previous records. When disabled, MaxBytes is computed
# using uncompressed bytes. Different Kafka implementations interpret MaxBytes
Expand Down
8 changes: 0 additions & 8 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ type KafkaConfig struct {
FallbackClientErrorSampleRate int64 `yaml:"-"`

StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"`
StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"`
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`

Expand Down Expand Up @@ -166,9 +164,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. 0 to disable.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.MaxBufferedBytes, prefix+".max-buffered-bytes", 100_000_000, "The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit.")

Expand Down Expand Up @@ -220,10 +216,6 @@ func (cfg *KafkaConfig) Validate() error {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater than 0 when ingest-storage.kafka.ongoing-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}

if cfg.MaxBufferedBytes >= math.MaxInt32 {
return fmt.Errorf("ingest-storage.kafka.max-buffered-bytes must be less than %d", math.MaxInt32)
}
Expand Down
Loading

0 comments on commit 6ca9c52

Please sign in to comment.