Skip to content

Commit

Permalink
ttl: make SelectQueryBuilder, DeleteQueryBuilder params of runTTLOnQu…
Browse files Browse the repository at this point in the history
…eryBounds

By creating these structs at a higher level and passing them into
runTTLOnQueryBounds, runTTLOnQueryBounds needs fewer params to create
these structs itself.

Release note: None
  • Loading branch information
ecwall authored and rafiss committed Dec 7, 2023
1 parent 0b7e838 commit 3c7198a
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 70 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
103 changes: 38 additions & 65 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (t *ttlProcessor) work(ctx context.Context) error {
codec := serverCfg.Codec
details := ttlSpec.RowLevelTTLDetails
tableID := details.TableID
cutoff := details.Cutoff
ttlExpr := ttlSpec.TTLExpr

deleteRateLimit := ttlSpec.DeleteRateLimit
deleteRateLimiter := quotapool.NewRateLimiter(
Expand All @@ -73,14 +75,13 @@ func (t *ttlProcessor) work(ctx context.Context) error {
deleteRateLimit,
)

processorRowCount := int64(0)

var (
relationName string
pkColNames []string
pkColTypes []*types.T
pkColDirs []catenumpb.IndexColumn_Direction
labelMetrics bool
relationName string
pkColNames []string
pkColTypes []*types.T
pkColDirs []catenumpb.IndexColumn_Direction
labelMetrics bool
processorRowCount int64
)
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, tableID)
Expand Down Expand Up @@ -139,25 +140,46 @@ func (t *ttlProcessor) work(ctx context.Context) error {
group.GoCtx(func(ctx context.Context) error {
for bounds := range boundsChan {
start := timeutil.Now()
selectBuilder := MakeSelectQueryBuilder(
SelectQueryParams{
RelationName: relationName,
PKColNames: pkColNames,
PKColDirs: pkColDirs,
Bounds: bounds,
AOSTDuration: ttlSpec.AOSTDuration,
SelectBatchSize: ttlSpec.SelectBatchSize,
TTLExpr: ttlExpr,
SelectDuration: metrics.SelectDuration,
},
cutoff,
)
deleteBuilder := MakeDeleteQueryBuilder(
DeleteQueryParams{
RelationName: relationName,
PKColNames: pkColNames,
DeleteBatchSize: ttlSpec.DeleteBatchSize,
TTLExpr: ttlExpr,
DeleteDuration: metrics.DeleteDuration,
DeleteRateLimiter: deleteRateLimiter,
},
cutoff,
)
spanRowCount, err := t.runTTLOnQueryBounds(
ctx,
metrics,
bounds,
pkColNames,
pkColDirs,
relationName,
deleteRateLimiter,
selectBuilder,
deleteBuilder,
)
// add before returning err in case of partial success
atomic.AddInt64(&processorRowCount, spanRowCount)
metrics.SpanTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
// Otherwise, the keys input will be blocked.
for bounds = range boundsChan {
}
return err
}
metrics.SpanTotalDuration.RecordValue(int64(timeutil.Since(start)))
}
return nil
})
Expand Down Expand Up @@ -227,11 +249,8 @@ func (t *ttlProcessor) work(ctx context.Context) error {
func (t *ttlProcessor) runTTLOnQueryBounds(
ctx context.Context,
metrics rowLevelTTLMetrics,
bounds QueryBounds,
pkColNames []string,
pkColDirs []catenumpb.IndexColumn_Direction,
relationName string,
deleteRateLimiter *quotapool.RateLimiter,
selectBuilder SelectQueryBuilder,
deleteBuilder DeleteQueryBuilder,
) (spanRowCount int64, err error) {
metrics.NumActiveSpans.Inc(1)
defer metrics.NumActiveSpans.Dec(1)
Expand All @@ -240,47 +259,10 @@ func (t *ttlProcessor) runTTLOnQueryBounds(

ttlSpec := t.ttlSpec
details := ttlSpec.RowLevelTTLDetails
cutoff := details.Cutoff
ttlExpr := ttlSpec.TTLExpr
flowCtx := t.FlowCtx
serverCfg := flowCtx.Cfg
ie := serverCfg.DB.Executor()

selectBatchSize := ttlSpec.SelectBatchSize

aostDuration := ttlSpec.AOSTDuration
if aostDuration == 0 {
// Read AOST in case of mixed 22.2.0/22.2.1+ cluster where the job started on a 22.2.0 node.
//lint:ignore SA1019 execinfrapb.TTLSpec.AOST is deprecated
aost := ttlSpec.AOST
if !aost.IsZero() {
aostDuration = aost.Sub(details.Cutoff)
}
}

selectBuilder := MakeSelectQueryBuilder(
SelectQueryParams{
RelationName: relationName,
PKColNames: pkColNames,
PKColDirs: pkColDirs,
Bounds: bounds,
AOSTDuration: ttlSpec.AOSTDuration,
SelectBatchSize: selectBatchSize,
TTLExpr: ttlExpr,
},
cutoff,
)
deleteBatchSize := ttlSpec.DeleteBatchSize
deleteBuilder := MakeDeleteQueryBuilder(
DeleteQueryParams{
RelationName: relationName,
PKColNames: pkColNames,
DeleteBatchSize: deleteBatchSize,
TTLExpr: ttlExpr,
},
cutoff,
)

preSelectStatement := ttlSpec.PreSelectStatement
if preSelectStatement != "" {
if _, err := ie.ExecEx(
Expand All @@ -303,16 +285,15 @@ func (t *ttlProcessor) runTTLOnQueryBounds(

// Step 1. Fetch some rows we want to delete using a historical
// SELECT query.
start := timeutil.Now()
expiredRowsPKs, hasNext, err := selectBuilder.Run(ctx, ie)
metrics.SelectDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
return spanRowCount, errors.Wrapf(err, "error selecting rows to delete")
}
numExpiredRows := int64(len(expiredRowsPKs))
metrics.RowSelections.Inc(numExpiredRows)

// Step 2. Delete the rows which have expired.
deleteBatchSize := deleteBuilder.DeleteBatchSize
for startRowIdx := int64(0); startRowIdx < numExpiredRows; startRowIdx += deleteBatchSize {
until := startRowIdx + deleteBatchSize
if until > numExpiredRows {
Expand All @@ -334,18 +315,10 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
desc.GetModificationTime().GoTime().Format(time.RFC3339),
)
}
tokens, err := deleteRateLimiter.Acquire(ctx, int64(len(deleteBatch)))
if err != nil {
return err
}
defer tokens.Consume()

start := timeutil.Now()
batchRowCount, err = deleteBuilder.Run(ctx, txn, deleteBatch)
if err != nil {
return err
}
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return nil
}
if err := serverCfg.DB.Txn(
Expand Down
28 changes: 23 additions & 5 deletions pkg/sql/ttl/ttljob/ttljob_query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -55,6 +58,7 @@ type SelectQueryParams struct {
AOSTDuration time.Duration
SelectBatchSize int64
TTLExpr catpb.Expression
SelectDuration *aggmetric.Histogram
}

// SelectQueryBuilder is responsible for maintaining state around the SELECT
Expand Down Expand Up @@ -129,6 +133,7 @@ func (b *SelectQueryBuilder) Run(
query = b.cachedQuery
}

start := timeutil.Now()
// Use a nil txn so that the AOST clause is handled correctly. Currently,
// the internal executor will treat a passed-in txn as an explicit txn, so
// the AOST clause on the SELECT query would not be interpreted correctly.
Expand All @@ -146,6 +151,7 @@ func (b *SelectQueryBuilder) Run(
if err != nil {
return nil, false, err
}
b.SelectDuration.RecordValue(int64(timeutil.Since(start)))

numRows := int64(len(rows))
if numRows > 0 {
Expand All @@ -165,10 +171,12 @@ func (b *SelectQueryBuilder) Run(
}

type DeleteQueryParams struct {
RelationName string
PKColNames []string
DeleteBatchSize int64
TTLExpr catpb.Expression
RelationName string
PKColNames []string
DeleteBatchSize int64
TTLExpr catpb.Expression
DeleteDuration *aggmetric.Histogram
DeleteRateLimiter *quotapool.RateLimiter
}

// DeleteQueryBuilder is responsible for maintaining state around the DELETE
Expand Down Expand Up @@ -228,6 +236,12 @@ func (b *DeleteQueryBuilder) Run(
}
}

tokens, err := b.DeleteRateLimiter.Acquire(ctx, int64(numRows))
if err != nil {
return 0, err
}
defer tokens.Consume()
start := timeutil.Now()
rowCount, err := txn.ExecEx(
ctx,
b.deleteOpName,
Expand All @@ -239,5 +253,9 @@ func (b *DeleteQueryBuilder) Run(
query,
deleteArgs...,
)
return int64(rowCount), err
if err != nil {
return 0, err
}
b.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return int64(rowCount), nil
}
11 changes: 11 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package ttljob_test
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"testing"
Expand All @@ -28,6 +29,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -341,6 +345,7 @@ func TestSelectQueryBuilder(t *testing.T) {
AOSTDuration: 0,
SelectBatchSize: 2,
TTLExpr: ttlColName,
SelectDuration: aggmetric.MakeBuilder().Histogram(metric.HistogramOptions{}).AddChild(),
},
cutoff,
)
Expand Down Expand Up @@ -453,6 +458,12 @@ func TestDeleteQueryBuilder(t *testing.T) {
PKColNames: pkColNames,
DeleteBatchSize: 2,
TTLExpr: ttlColName,
DeleteDuration: aggmetric.MakeBuilder().Histogram(metric.HistogramOptions{}).AddChild(),
DeleteRateLimiter: quotapool.NewRateLimiter(
"",
quotapool.Inf(),
math.MaxInt64,
),
},
cutoff,
)
Expand Down

0 comments on commit 3c7198a

Please sign in to comment.