Skip to content

Commit

Permalink
refactor: flushAll method to wait for all log flushes to complete (#5162
Browse files Browse the repository at this point in the history
)

Signed-off-by: naonao2323 <yanagidanaoki9900@gmail.com>
  • Loading branch information
naonao2323 committed Sep 2, 2024
1 parent 3366724 commit a5cc7fe
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/app/piped/logpersister/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
Expand Down Expand Up @@ -157,17 +158,26 @@ func (p *persister) flush(ctx context.Context) (flushes, deletes int) {
}

func (p *persister) flushAll(ctx context.Context) int {
group, ctx := errgroup.WithContext(ctx)
var num = 0

p.stagePersisters.Range(func(_, v interface{}) bool {
sp := v.(*stageLogPersister)
if !sp.isStale(p.stalePeriod) {
group.Go(func() error {
return sp.flushFromLastCheckpoint(ctx)
})
num++
go sp.flushFromLastCheckpoint(ctx)
}
return true
})

if err := group.Wait(); err != nil {
p.logger.Error(
"failed to flush all stage persisters",
zap.Error(err),
)
return num
}
p.logger.Info(fmt.Sprintf("flushing all of %d stage persisters", num))
return num
}
Expand Down

0 comments on commit a5cc7fe

Please sign in to comment.