Skip to content

Commit

Permalink
ttljob: introduce sql.ttl.default_select_rate_limit cluster setting, …
Browse files Browse the repository at this point in the history
…ttl_select_rate_limit storage param

Fixes cockroachdb#110742

Release note (sql change): Add sql.ttl.default_select_rate_limit
cluster setting and ttl_select_rate_limit table storage param
to set TTL select rate limit. This sets the number of records per
table per second per node that can be selected by the TTL job.
  • Loading branch information
ecwall authored and rafiss committed Dec 7, 2023
1 parent 3c7198a commit 9eed2f5
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 38 deletions.
21 changes: 12 additions & 9 deletions docs/RFCS/20220120_row_level_ttl.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,29 +140,31 @@ message TableDescriptor {
option (gogoproto.equal) = true;
// DurationExpr is the automatically assigned interval for when the TTL should apply to a row.
optional string duration_expr = 1 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
optional string duration_expr = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "Expression"];
// SelectBatchSize is the amount of rows that should be fetched at a time
optional int64 select_batch_size = 2 [(gogoproto.nullable)=false];
optional int64 select_batch_size = 2 [(gogoproto.nullable) = false];
// DeleteBatchSize is the amount of rows that should be deleted at a time.
optional int64 delete_batch_size = 3 [(gogoproto.nullable)=false];
optional int64 delete_batch_size = 3 [(gogoproto.nullable) = false];
// DeletionCron signifies how often the TTL deletion job runs in a cron format.
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
optional string deletion_cron = 4 [(gogoproto.nullable) = false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
optional int64 schedule_id = 5 [(gogoproto.customname) = "ScheduleID", (gogoproto.nullable) = false];
// RangeConcurrency is based on the number of spans and is no longer configurable.
reserved 6;
// DeleteRateLimit is the maximum amount of rows to delete per second.
optional int64 delete_rate_limit = 7 [(gogoproto.nullable)=false];
optional int64 delete_rate_limit = 7 [(gogoproto.nullable) = false];
// Pause is set if the TTL job should not run.
optional bool pause = 8 [(gogoproto.nullable)=false];
optional bool pause = 8 [(gogoproto.nullable) = false];
// RowStatsPollInterval is the interval to report row statistics (number of rows on table, number of expired
// rows on table) during row level TTL. If zero, no statistics are reported.
optional int64 row_stats_poll_interval = 9 [(gogoproto.nullable)=false, (gogoproto.casttype)="time.Duration"];
optional int64 row_stats_poll_interval = 9 [(gogoproto.nullable) = false, (gogoproto.casttype) = "time.Duration"];
// LabelMetrics is true if metrics for the TTL job should add a label containing
// the relation name.
optional bool label_metrics = 10 [(gogoproto.nullable) = false];
// ExpirationExpr is the custom assigned expression for calculating when the TTL should apply to a row.
optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
optional string expiration_expr = 11 [(gogoproto.nullable) = false, (gogoproto.casttype) = "Expression"];
// SelectRateLimit is the maximum amount of rows to select per second.
optional int64 select_rate_limit = 12 [(gogoproto.nullable) = false];
}
// ...
Expand All @@ -182,6 +184,7 @@ the following options to control the TTL job:
| `ttl_expiration_expression` | If set, uses the expression specified as the TTL expiration. Defaults to just using the `crdb_internal_expiration` column. |
| `ttl_select_batch_size` | How many rows to fetch from the range that have expired at a given time. Defaults to 500. Must be at least `1`. |
| `ttl_delete_batch_size` | How many rows to delete at a time. Defaults to 100. Must be at least `1`. |
| `ttl_select_rate_limit` | Maximum number of rows to be selected per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_delete_rate_limit` | Maximum number of rows to be deleted per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_row_stats_poll_interval` | Whilst the TTL job is running, counts rows and expired rows on the table to report as prometheus metrics. By default unset, meaning no stats are fetched. |
| `ttl_pause` | Stops the TTL job from executing. |
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ sql.trace.txn.enable_threshold duration 0s enables tracing on all transactions;
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job application
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job application
sql.ttl.default_select_rate_limit integer 0 default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.job.enabled boolean true whether the TTL job is enabled application
sql.txn.read_committed_isolation.enabled boolean false set to true to allow transactions to use the READ COMMITTED isolation level if specified by BEGIN/SET commands application
sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@
<tr><td><div id="setting-sql-ttl-default-delete-batch-size" class="anchored"><code>sql.ttl.default_delete_batch_size</code></div></td><td>integer</td><td><code>100</code></td><td>default amount of rows to delete in a single query during a TTL job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-delete-rate-limit" class="anchored"><code>sql.ttl.default_delete_rate_limit</code></div></td><td>integer</td><td><code>0</code></td><td>default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-select-batch-size" class="anchored"><code>sql.ttl.default_select_batch_size</code></div></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-select-rate-limit" class="anchored"><code>sql.ttl.default_select_rate_limit</code></div></td><td>integer</td><td><code>0</code></td><td>default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-job-enabled" class="anchored"><code>sql.ttl.job.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>whether the TTL job is enabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-txn-read-committed-isolation-enabled" class="anchored"><code>sql.txn.read_committed_isolation.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to allow transactions to use the READ COMMITTED isolation level if specified by BEGIN/SET commands</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-txn-fingerprint-id-cache-capacity" class="anchored"><code>sql.txn_fingerprint_id_cache.capacity</code></div></td><td>integer</td><td><code>100</code></td><td>the maximum number of txn fingerprint IDs stored</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/catpb/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ message RowLevelTTL {
optional bool label_metrics = 10 [(gogoproto.nullable) = false];
// ExpirationExpr is the custom assigned expression for calculating when the TTL should apply to a row.
optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
// SelectRateLimit is the maximum amount of rows to select per second.
optional int64 select_rate_limit = 12 [(gogoproto.nullable)=false];
}

// AutoStatsSettings represents settings related to automatic statistics
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2389,6 +2389,9 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string {
if bs := ttl.DeleteBatchSize; bs != 0 {
appendStorageParam(`ttl_delete_batch_size`, fmt.Sprintf(`%d`, bs))
}
if rl := ttl.SelectRateLimit; rl != 0 {
appendStorageParam(`ttl_select_rate_limit`, fmt.Sprintf(`%d`, rl))
}
if rl := ttl.DeleteRateLimit; rl != 0 {
appendStorageParam(`ttl_delete_rate_limit`, fmt.Sprintf(`%d`, rl))
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/tabledesc/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
return err
}
}
if ttl.SelectRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_select_rate_limit", ttl.SelectRateLimit); err != nil {
return err
}
}
if ttl.DeleteRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_delete_rate_limit", ttl.DeleteRateLimit); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/processors_ttl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,7 @@ message TTLSpec {
(gogoproto.customname) = "AOSTDuration",
(gogoproto.stdduration) = true
];

// SelectRateLimit controls how many records can be selected per second.
optional int64 select_rate_limit = 14 [(gogoproto.nullable) = false];
}
54 changes: 45 additions & 9 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,29 @@ subtest reloptions
statement ok
CREATE TABLE tbl_reloptions (
id INT PRIMARY KEY
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 10, ttl_delete_batch_size=20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
) WITH (
ttl_expire_after = '10 minutes',
ttl_select_batch_size = 10,
ttl_delete_batch_size = 20,
ttl_select_rate_limit = 30,
ttl_delete_rate_limit = 40,
ttl_pause = true,
ttl_row_stats_poll_interval = '1 minute',
ttl_label_metrics = true
)

query T
SELECT reloptions FROM pg_class WHERE relname = 'tbl_reloptions'
query T rowsort
SELECT unnest(reloptions) FROM pg_class WHERE relname = 'tbl_reloptions'
----
{ttl='on',ttl_expire_after='00:10:00':::INTERVAL,ttl_select_batch_size=10,ttl_delete_batch_size=20,ttl_delete_rate_limit=30,ttl_pause=true,ttl_row_stats_poll_interval='1m0s',ttl_label_metrics=true}
ttl='on'
ttl_expire_after='00:10:00':::INTERVAL
ttl_select_batch_size=10
ttl_delete_batch_size=20
ttl_select_rate_limit=30
ttl_delete_rate_limit=40
ttl_pause=true
ttl_row_stats_poll_interval='1m0s'
ttl_label_metrics=true

subtest end

Expand Down Expand Up @@ -724,6 +741,9 @@ ALTER TABLE tbl_ttl_params_positive SET (ttl_select_batch_size = -1)
statement error "ttl_delete_batch_size" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_delete_batch_size = -1)

statement error "ttl_select_rate_limit" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_select_rate_limit = -1)

statement error "ttl_delete_rate_limit" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_delete_rate_limit = -1)

Expand All @@ -737,7 +757,16 @@ subtest set_ttl_params
statement ok
CREATE TABLE tbl_set_ttl_params (
id INT PRIMARY KEY
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 10, ttl_delete_batch_size=20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
) WITH (
ttl_expire_after = '10 minutes',
ttl_select_batch_size = 10,
ttl_delete_batch_size = 20,
ttl_select_rate_limit = 30,
ttl_delete_rate_limit = 40,
ttl_pause = true,
ttl_row_stats_poll_interval = '1 minute',
ttl_label_metrics = true
)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
Expand All @@ -746,10 +775,10 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)

statement ok
ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_delete_rate_limit = 130, ttl_row_stats_poll_interval = '2m0s')
ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_row_stats_poll_interval = '2m0s')

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
Expand All @@ -758,10 +787,17 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_delete_rate_limit = 130, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)

statement ok
ALTER TABLE tbl_set_ttl_params RESET (ttl_select_batch_size, ttl_delete_batch_size, ttl_delete_rate_limit, ttl_pause, ttl_row_stats_poll_interval)
ALTER TABLE tbl_set_ttl_params RESET (
ttl_select_batch_size,
ttl_delete_batch_size,
ttl_select_rate_limit,
ttl_delete_rate_limit,
ttl_pause,
ttl_row_stats_poll_interval
)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/storageparam/tablestorageparam/table_storage_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,26 @@ var tableParams = map[string]tableParam{
return nil
},
},
`ttl_select_rate_limit`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
val, err := paramparse.DatumAsInt(ctx, evalCtx, key, datum)
if err != nil {
return err
}
if err := tabledesc.ValidateTTLRateLimit(key, val); err != nil {
return err
}
rowLevelTTL := po.getOrCreateRowLevelTTL()
rowLevelTTL.SelectRateLimit = val
return nil
},
onReset: func(_ context.Context, po *Setter, evalCtx *eval.Context, key string) error {
if po.hasRowLevelTTL() {
po.UpdatedRowLevelTTL.SelectRateLimit = 0
}
return nil
},
},
`ttl_delete_rate_limit`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
val, err := paramparse.DatumAsInt(ctx, evalCtx, key, datum)
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/ttl/ttlbase/ttl_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ var (
settings.PositiveInt,
settings.WithPublic,
)
defaultSelectRateLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.ttl.default_select_rate_limit",
"default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.",
0,
settings.NonNegativeInt,
settings.WithPublic,
)
defaultDeleteRateLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.ttl.default_delete_rate_limit",
Expand Down Expand Up @@ -93,6 +101,20 @@ func GetDeleteBatchSize(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
return bs
}

// GetSelectRateLimit returns the table storage param value if specified or
// falls back to the cluster setting.
func GetSelectRateLimit(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
rl := ttl.SelectRateLimit
if rl == 0 {
rl = defaultSelectRateLimit.Get(sv)
}
// Put the maximum tokens possible if there is no rate limit.
if rl == 0 {
rl = math.MaxInt64
}
return rl
}

// GetDeleteRateLimit returns the table storage param value if specified or
// falls back to the cluster setting.
func GetDeleteRateLimit(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
"github.com/cockroachdb/errors"
)

// rowLevelTTLResumer implements the TTL job. The job can run on any node, but
// the job node distributes SELECT/DELETE work via DistSQL to ttlProcessor
// nodes. DistSQL divides work into spans that each ttlProcessor scans in a
// SELECT/DELETE loop.
type rowLevelTTLResumer struct {
job *jobs.Job
st *cluster.Settings
Expand Down Expand Up @@ -191,6 +195,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
jobID := t.job.ID()
selectBatchSize := ttlbase.GetSelectBatchSize(settingsValues, rowLevelTTL)
deleteBatchSize := ttlbase.GetDeleteBatchSize(settingsValues, rowLevelTTL)
selectRateLimit := ttlbase.GetSelectRateLimit(settingsValues, rowLevelTTL)
deleteRateLimit := ttlbase.GetDeleteRateLimit(settingsValues, rowLevelTTL)
newTTLSpec := func(spans []roachpb.Span) *execinfrapb.TTLSpec {
return &execinfrapb.TTLSpec{
Expand All @@ -202,6 +207,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
Spans: spans,
SelectBatchSize: selectBatchSize,
DeleteBatchSize: deleteBatchSize,
SelectRateLimit: selectRateLimit,
DeleteRateLimit: deleteRateLimit,
LabelMetrics: rowLevelTTL.LabelMetrics,
PreDeleteChangeTableVersion: knobs.PreDeleteChangeTableVersion,
Expand Down
Loading

0 comments on commit 9eed2f5

Please sign in to comment.