From 35703b2fdd233a52adc2c672155a51cab24e0b10 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Sat, 27 Apr 2024 08:28:27 +0800 Subject: [PATCH] redo(ticdc): enable pprof and set memory limit for redo applier (#10904) close pingcap/tiflow#10900 --- cdc/redo/reader/reader.go | 7 +------ pkg/cmd/redo/apply.go | 38 +++++++++++++++++++++++++++++++++++++- pkg/util/memory.go | 28 ++++++++++++++++++---------- 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/cdc/redo/reader/reader.go b/cdc/redo/reader/reader.go index aaf6c542154..a8a7dd47616 100644 --- a/cdc/redo/reader/reader.go +++ b/cdc/redo/reader/reader.go @@ -37,7 +37,7 @@ import ( const ( emitBatch = mysql.DefaultMaxTxnRow defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch - maxTotalMemoryUsage = 90.0 + maxTotalMemoryUsage = 80.0 maxWaitDuration = time.Minute * 2 ) @@ -205,11 +205,6 @@ func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error { case l.rowCh <- row: } } - err := util.WaitMemoryAvailable(maxTotalMemoryUsage, maxWaitDuration) - if err != nil { - return errors.Trace(err) - } - case redo.RedoDDLLogFileType: ddl := item.data.RedoDDL.DDL if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs { diff --git a/pkg/cmd/redo/apply.go b/pkg/cmd/redo/apply.go index b06276becc4..6725e3d4eca 100644 --- a/pkg/cmd/redo/apply.go +++ b/pkg/cmd/redo/apply.go @@ -14,18 +14,27 @@ package redo import ( + "net/http" + _ "net/http/pprof" // init pprof "net/url" + "runtime/debug" + "time" + "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/applier" cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" "github.com/spf13/cobra" + "go.uber.org/zap" ) // applyRedoOptions defines flags for the `redo apply` command. type applyRedoOptions struct { options - sinkURI string + sinkURI string + enableProfiling bool + memoryLimitInGiBytes int64 } // newapplyRedoOptions creates new applyRedoOptions for the `redo apply` command. @@ -39,6 +48,8 @@ func (o *applyRedoOptions) addFlags(cmd *cobra.Command) { cmd.Flags().StringVar(&o.sinkURI, "sink-uri", "", "target database sink-uri") // the possible error returned from MarkFlagRequired is `no such flag` cmd.MarkFlagRequired("sink-uri") //nolint:errcheck + cmd.Flags().BoolVar(&o.enableProfiling, "enable-profiling", true, "enable pprof profiling") + cmd.Flags().Int64Var(&o.memoryLimitInGiBytes, "memory-limit", 10, "memory limit in GiB") } //nolint:unparam @@ -55,6 +66,18 @@ func (o *applyRedoOptions) complete(cmd *cobra.Command) error { sinkURI.RawQuery = rawQuery.Encode() o.sinkURI = sinkURI.String() } + + totalMemory, err := util.GetMemoryLimit() + if err == nil { + totalMemoryInBytes := int64(float64(totalMemory) * 0.8) + memoryLimitInBytes := o.memoryLimitInGiBytes * 1024 * 1024 * 1024 + if totalMemoryInBytes != 0 && memoryLimitInBytes > totalMemoryInBytes { + memoryLimitInBytes = totalMemoryInBytes + } + debug.SetMemoryLimit(memoryLimitInBytes) + log.Info("set memory limit", zap.Int64("memoryLimit", memoryLimitInBytes)) + } + return nil } @@ -62,6 +85,19 @@ func (o *applyRedoOptions) complete(cmd *cobra.Command) error { func (o *applyRedoOptions) run(cmd *cobra.Command) error { ctx := cmdcontext.GetDefaultContext() + if o.enableProfiling { + go func() { + server := &http.Server{ + Addr: ":6060", + ReadHeaderTimeout: 5 * time.Second, + } + log.Info("Start http pprof server", zap.String("addr", server.Addr)) + if err := server.ListenAndServe(); err != nil { + log.Fatal("http pprof", zap.Error(err)) + } + }() + } + cfg := &applier.RedoApplierConfig{ Storage: o.storage, SinkURI: o.sinkURI, diff --git a/pkg/util/memory.go b/pkg/util/memory.go index 0ef422d7b0c..de587a28783 100644 --- a/pkg/util/memory.go +++ b/pkg/util/memory.go @@ -14,6 +14,7 @@ package util import ( + "context" "math" "time" @@ -48,21 +49,28 @@ func CheckMemoryUsage(limit float64) (bool, error) { if err != nil { return false, err } + + log.Info("check memory usage", zap.Any("memory", stat)) return stat.UsedPercent < limit, nil } // WaitMemoryAvailable waits until the memory usage is less than the limit. -func WaitMemoryAvailable(limit float64, timeout time.Duration) error { - start := time.Now() +func WaitMemoryAvailable(ctx context.Context, limit float64, timeout time.Duration) error { + ticker := time.NewTicker(time.Second * 5) + timeoutTimer := time.NewTimer(timeout) for { - hasFreeMemory, err := CheckMemoryUsage(limit) - if err != nil { - return err - } - if hasFreeMemory { - return nil - } - if time.Since(start) > timeout { + select { + case <-ctx.Done(): + return errors.WrapError(errors.ErrWaitFreeMemoryTimeout, ctx.Err()) + case <-ticker.C: + hasFreeMemory, err := CheckMemoryUsage(limit) + if err != nil { + return err + } + if hasFreeMemory { + return nil + } + case <-timeoutTimer.C: return errors.ErrWaitFreeMemoryTimeout.GenWithStackByArgs() } }