Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jun 14, 2024
1 parent c4a011d commit 2a3823e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 11 deletions.
9 changes: 8 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ingest
import (
"context"
"math"
"os"
"path/filepath"
"strconv"
"sync"
Expand Down Expand Up @@ -126,7 +127,13 @@ func (m *litBackendCtxMgr) Register(
if !ok {
return nil, genBackendAllocMemFailedErr(ctx, m.memRoot, jobID)
}
cfg, err := genConfig(ctx, m.encodeJobSortPath(jobID), m.memRoot, hasUnique, resourceGroupName)
sortPath := m.encodeJobSortPath(jobID)
err := os.MkdirAll(sortPath, 0700)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
16 changes: 7 additions & 9 deletions pkg/ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,16 @@ func GenIngestTempDataDir() (string, error) {
return sortPath, nil
}

// CleanUpTempDir reads the temp dir and removes the stale index data.
// This function will get
// CleanUpTempDir is used to remove the stale index data.
// This function gets running DDL jobs from `mysql.tidb_ddl_job` and
// it only removes the folders that related to finished jobs.
func CleanUpTempDir(ctx context.Context, se sessionctx.Context, path string) {
err := os.MkdirAll(path, 0700)
if err != nil {
logutil.DDLIngestLogger().Error(LitErrCreateDirFail, zap.Error(err))
return
}

entries, err := os.ReadDir(path)
if err != nil {
logutil.DDLIngestLogger().Error(LitErrReadSortPath, zap.Error(err))
if strings.Contains(err.Error(), "no such file") {
return
}
logutil.DDLIngestLogger().Warn(LitErrCleanSortPath, zap.Error(err))
return
}
toCheckJobIDs := make(map[int64]struct{}, len(entries))
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/ingest/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ func TestLitBackendCtxMgr(t *testing.T) {
ingest.CleanUpTempDir(ctx, tk.Session(), sortPath)
require.NoDirExists(t, staleJobDir)
require.NoDirExists(t, staleJobDir2)

ingest.CleanUpTempDir(ctx, tk.Session(), "unkown_path")
}
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
LitErrExceedConcurrency string = "the concurrency is greater than ingest limit"
LitErrCloseWriterErr string = "close writer error"
LitErrReadSortPath string = "cannot read sort path"
LitErrCleanSortPath string = "cannot cleanup sort path"
LitErrCleanSortPath string = "clean up temp dir failed"
LitErrResetEngineFail string = "reset engine failed"
LitWarnEnvInitFail string = "initialize environment failed"
LitWarnConfigError string = "build config for backend failed"
Expand Down

0 comments on commit 2a3823e

Please sign in to comment.