Skip to content

Commit

Permalink
Make controller's os.RemoveAll single-threaded (#5217)
Browse files Browse the repository at this point in the history
* Make controller's os.RemoveAll single-threaded

Signed-off-by: Shinnosuke Sawada-Dazai <shin@warashi.dev>

* Close the workingDirRemovalCh

Signed-off-by: Shinnosuke Sawada-Dazai <shin@warashi.dev>

---------

Signed-off-by: Shinnosuke Sawada-Dazai <shin@warashi.dev>
  • Loading branch information
Warashi authored Sep 24, 2024
1 parent 29dd1b6 commit 65c98a6
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions pkg/app/piped/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ type controller struct {
// WaitGroup for waiting the completions of all planners, schedulers.
wg sync.WaitGroup

// workingDirRemovalCh is used to single-threaded removal of working directory.
workingDirRemovalCh chan string

workspaceDir string
syncInternal time.Duration
gracePeriod time.Duration
Expand Down Expand Up @@ -186,6 +189,8 @@ func NewController(
mostRecentlySuccessfulCommits: make(map[string]string),
mostRecentlySuccessfulConfigFilenames: make(map[string]string),

workingDirRemovalCh: make(chan string),

syncInternal: 10 * time.Second,
gracePeriod: gracePeriod,
logger: lg,
Expand Down Expand Up @@ -219,6 +224,16 @@ func (c *controller) Run(ctx context.Context) error {
close(lpStoppedCh)
}()

// Start workspace cleaner.
// This will remove the workspace directory of the completed planner/scheduler.
go func() {
for ws := range c.workingDirRemovalCh {
if err := os.RemoveAll(ws); err != nil {
c.logger.Error("failed to remove working directory", zap.String("workDir", ws), zap.Error(err))
}
}
}()

ticker := time.NewTicker(c.syncInternal)
defer ticker.Stop()
c.logger.Info("start syncing planners and schedulers")
Expand Down Expand Up @@ -246,10 +261,19 @@ func (c *controller) shutdown(cancel func(), stoppedCh <-chan error) error {
cancel()
err := <-stoppedCh

// Stop the workspace cleaner.
// all calls of removeWorkingDir is completed because the c.wg.Wait() is done.
// so we can close the channel safely.
close(c.workingDirRemovalCh)

c.logger.Info("controller has been stopped")
return err
}

func (c *controller) removeWorkingDir(ws string) {
c.workingDirRemovalCh <- ws
}

// checkCommands lists all unhandled commands for running deployments
// and forwards them to their planners and schedulers.
func (c *controller) checkCommands() {
Expand Down Expand Up @@ -483,10 +507,8 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
)

cleanup := func() {
logger.Info("cleaning up working directory for planner")
if err := os.RemoveAll(workingDir); err != nil {
logger.Warn("failed to clean working directory", zap.Error(err))
}
c.removeWorkingDir(workingDir)
logger.Info("cleaned working directory for planner")
}

// Start running planner.
Expand Down Expand Up @@ -627,15 +649,8 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
)

cleanup := func() {
logger.Info("cleaning up working directory for scheduler", zap.String("working-dir", workingDir))
err := os.RemoveAll(workingDir)
if err == nil {
return
}
logger.Warn("failed to clean working directory",
zap.String("working-dir", workingDir),
zap.Error(err),
)
c.removeWorkingDir(workingDir)
logger.Info("cleaned working directory for scheduler", zap.String("working-dir", workingDir))
}

// Start running scheduler.
Expand Down

0 comments on commit 65c98a6

Please sign in to comment.