Skip to content

Commit

Permalink
redo(ticdc): enable pprof and set memory limit for redo applier (#10904)
Browse files Browse the repository at this point in the history
close #10900
  • Loading branch information
CharlesCheung96 authored Apr 27, 2024
1 parent d0329d7 commit 35703b2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
7 changes: 1 addition & 6 deletions cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
emitBatch = mysql.DefaultMaxTxnRow
defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch
maxTotalMemoryUsage = 90.0
maxTotalMemoryUsage = 80.0
maxWaitDuration = time.Minute * 2
)

Expand Down Expand Up @@ -205,11 +205,6 @@ func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error {
case l.rowCh <- row:
}
}
err := util.WaitMemoryAvailable(maxTotalMemoryUsage, maxWaitDuration)
if err != nil {
return errors.Trace(err)
}

case redo.RedoDDLLogFileType:
ddl := item.data.RedoDDL.DDL
if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs {
Expand Down
38 changes: 37 additions & 1 deletion pkg/cmd/redo/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@
package redo

import (
"net/http"
_ "net/http/pprof" // init pprof
"net/url"
"runtime/debug"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/applier"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

// applyRedoOptions defines flags for the `redo apply` command.
type applyRedoOptions struct {
options
sinkURI string
sinkURI string
enableProfiling bool
memoryLimitInGiBytes int64
}

// newapplyRedoOptions creates new applyRedoOptions for the `redo apply` command.
Expand All @@ -39,6 +48,8 @@ func (o *applyRedoOptions) addFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&o.sinkURI, "sink-uri", "", "target database sink-uri")
// the possible error returned from MarkFlagRequired is `no such flag`
cmd.MarkFlagRequired("sink-uri") //nolint:errcheck
cmd.Flags().BoolVar(&o.enableProfiling, "enable-profiling", true, "enable pprof profiling")
cmd.Flags().Int64Var(&o.memoryLimitInGiBytes, "memory-limit", 10, "memory limit in GiB")
}

//nolint:unparam
Expand All @@ -55,13 +66,38 @@ func (o *applyRedoOptions) complete(cmd *cobra.Command) error {
sinkURI.RawQuery = rawQuery.Encode()
o.sinkURI = sinkURI.String()
}

totalMemory, err := util.GetMemoryLimit()
if err == nil {
totalMemoryInBytes := int64(float64(totalMemory) * 0.8)
memoryLimitInBytes := o.memoryLimitInGiBytes * 1024 * 1024 * 1024
if totalMemoryInBytes != 0 && memoryLimitInBytes > totalMemoryInBytes {
memoryLimitInBytes = totalMemoryInBytes
}
debug.SetMemoryLimit(memoryLimitInBytes)
log.Info("set memory limit", zap.Int64("memoryLimit", memoryLimitInBytes))
}

return nil
}

// run runs the `redo apply` command.
func (o *applyRedoOptions) run(cmd *cobra.Command) error {
ctx := cmdcontext.GetDefaultContext()

if o.enableProfiling {
go func() {
server := &http.Server{
Addr: ":6060",
ReadHeaderTimeout: 5 * time.Second,
}
log.Info("Start http pprof server", zap.String("addr", server.Addr))
if err := server.ListenAndServe(); err != nil {
log.Fatal("http pprof", zap.Error(err))
}
}()
}

cfg := &applier.RedoApplierConfig{
Storage: o.storage,
SinkURI: o.sinkURI,
Expand Down
28 changes: 18 additions & 10 deletions pkg/util/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package util

import (
"context"
"math"
"time"

Expand Down Expand Up @@ -48,21 +49,28 @@ func CheckMemoryUsage(limit float64) (bool, error) {
if err != nil {
return false, err
}

log.Info("check memory usage", zap.Any("memory", stat))
return stat.UsedPercent < limit, nil
}

// WaitMemoryAvailable waits until the memory usage is less than the limit.
func WaitMemoryAvailable(limit float64, timeout time.Duration) error {
start := time.Now()
func WaitMemoryAvailable(ctx context.Context, limit float64, timeout time.Duration) error {
ticker := time.NewTicker(time.Second * 5)
timeoutTimer := time.NewTimer(timeout)
for {
hasFreeMemory, err := CheckMemoryUsage(limit)
if err != nil {
return err
}
if hasFreeMemory {
return nil
}
if time.Since(start) > timeout {
select {
case <-ctx.Done():
return errors.WrapError(errors.ErrWaitFreeMemoryTimeout, ctx.Err())
case <-ticker.C:
hasFreeMemory, err := CheckMemoryUsage(limit)
if err != nil {
return err
}
if hasFreeMemory {
return nil
}
case <-timeoutTimer.C:
return errors.ErrWaitFreeMemoryTimeout.GenWithStackByArgs()
}
}
Expand Down

0 comments on commit 35703b2

Please sign in to comment.