Skip to content

Commit

Permalink
redo(ticdc): Pre-create redolog dir before file allocation (#6363)
Browse files Browse the repository at this point in the history
close #6380
  • Loading branch information
zhaoxinyu authored Jul 21, 2022
1 parent 74302ac commit 97f6592
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
21 changes: 21 additions & 0 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri
w.uuidGenerator = uuid.NewGenerator()
}

if len(cfg.Dir) == 0 {
return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.New("invalid redo dir path"))
}

err := os.MkdirAll(cfg.Dir, common.DefaultDirMode)
if err != nil {
return nil, cerror.WrapError(cerror.ErrRedoFileOp,
errors.Annotatef(err, "can't make dir: %s for redo writing", cfg.Dir))
}

// if we use S3 as the remote storage, a file allocator can be leveraged to
// pre-allocate files for us.
// TODO: test whether this improvement can also be applied to NFS.
Expand Down Expand Up @@ -333,6 +343,17 @@ func (w *Writer) close() error {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

dirFile, err := os.Open(w.cfg.Dir)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer dirFile.Close()
// sync the dir so as to guarantee the renamed file is persisted to disk.
err = dirFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

// We only write content to S3 before closing the local file.
// By this way, we no longer need renaming object in S3.
if w.cfg.S3Storage {
Expand Down
23 changes: 19 additions & 4 deletions cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,20 +551,35 @@ func (l *LogWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't make dir for new redo logfile"))
}

metaFile, err := openTruncFile(l.filePath())
// we will create a temp metadata file and then atomically rename it.
tmpFileName := l.filePath() + common.MetaTmpEXT
tmpFile, err := openTruncFile(tmpFileName)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer tmpFile.Close()

_, err = metaFile.Write(data)
_, err = tmpFile.Write(data)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
err = metaFile.Sync()
err = tmpFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
err = metaFile.Close()

err = os.Rename(tmpFileName, l.filePath())
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

dirFile, err := os.Open(l.cfg.Dir)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer dirFile.Close()
// sync the dir so as to guarantee the renamed file is persisted to disk.
err = dirFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/fsutil/file_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@ func TestFileAllocateSuccess(t *testing.T) {
}

func TestFileAllocateFailed(t *testing.T) {
// 1. the requested allocation space will cause disk full
fl := NewFileAllocator(t.TempDir(), "test", math.MaxInt64)
defer fl.Close()

f, err := fl.Open()
require.Nil(t, f)
require.NotNil(t, err)
f.Close()
fl.Close()

// 2. the directory does not exist
fl = NewFileAllocator("not-exist-dir", "test", 1024)
f, err = fl.Open()
require.NotNil(t, err)
require.Nil(t, f)
fl.Close()
}

func benchmarkWriteData(b *testing.B, size int, useFileAlloctor bool) {
Expand Down

0 comments on commit 97f6592

Please sign in to comment.