diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go index 86a636af3d1..0b4d4fb136e 100644 --- a/cdc/redo/writer/file.go +++ b/cdc/redo/writer/file.go @@ -459,6 +459,10 @@ func (w *Writer) shouldRemoved(checkPointTs uint64, f os.FileInfo) (bool, error) func (w *Writer) getShouldRemovedFiles(checkPointTs uint64) ([]os.FileInfo, error) { files, err := ioutil.ReadDir(w.cfg.Dir) if err != nil { + if os.IsNotExist(err) { + log.Warn("check removed log dir fail", zap.Error(err)) + return []os.FileInfo{}, nil + } return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't read log file directory: %s", w.cfg.Dir)) } diff --git a/cdc/redo/writer/file_test.go b/cdc/redo/writer/file_test.go index e71b814d553..208e843589f 100644 --- a/cdc/redo/writer/file_test.go +++ b/cdc/redo/writer/file_test.go @@ -197,8 +197,17 @@ func TestWriterGC(t *testing.T) { require.Nil(t, err, files[0].Name()) require.EqualValues(t, 3, ts) require.Equal(t, common.DefaultRowLogFileType, fileType) - time.Sleep(time.Duration(100) * time.Millisecond) + + w1 := &Writer{ + cfg: cfg, + uint64buf: make([]byte, 8), + storage: mockStorage, + } + w1.cfg.Dir += "not-exist" + w1.running.Store(true) + err = w1.GC(111) + require.Nil(t, err) } func TestAdvanceTs(t *testing.T) { diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index fceb3de233b..debf2243596 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -175,12 +175,52 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error) if err != nil { return nil, err } + } else { + if cfg.S3Storage { + // since other process get the remove changefeed job async, may still write some logs after owner delete the log + err = logWriter.preCleanUpS3(ctx) + if err != nil { + return nil, err + } + } } logWriters[cfg.ChangeFeedID] = logWriter go logWriter.runGC(ctx) return logWriter, nil } +func (l *LogWriter) preCleanUpS3(ctx context.Context) error { + ret, err := l.storage.FileExists(ctx, l.getDeletedChangefeedMarker()) + if err != nil { + return cerror.WrapError(cerror.ErrS3StorageAPI, err) + } + if !ret { + return nil + } + + files, err := getAllFilesInS3(ctx, l) + if err != nil { + return err + } + + ff := []string{} + for _, file := range files { + if file != l.getDeletedChangefeedMarker() { + ff = append(ff, file) + } + } + err = l.deleteFilesInS3(ctx, ff) + if err != nil { + return err + } + err = l.storage.DeleteFile(ctx, l.getDeletedChangefeedMarker()) + if !isNotExistInS3(err) { + return cerror.WrapError(cerror.ErrS3StorageAPI, err) + } + + return nil +} + func (l *LogWriter) initMeta(ctx context.Context) error { select { case <-ctx.Done(): @@ -433,7 +473,18 @@ func (l *LogWriter) DeleteAllLogs(ctx context.Context) error { } // after delete logs, rm the LogWriter since it is already closed l.cleanUpLogWriter() - return nil + + // write a marker to s3, since other process get the remove changefeed job async, + // may still write some logs after owner delete the log + return l.writeDeletedMarkerToS3(ctx) +} + +func (l *LogWriter) getDeletedChangefeedMarker() string { + return fmt.Sprintf("delete_%s", l.cfg.ChangeFeedID) +} + +func (l *LogWriter) writeDeletedMarkerToS3(ctx context.Context) error { + return cerror.WrapError(cerror.ErrS3StorageAPI, l.storage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D"))) } func (l *LogWriter) cleanUpLogWriter() { diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index 618281b5537..e32f3074fa1 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "math" + "net/url" "os" "path/filepath" "strings" @@ -29,6 +30,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pingcap/errors" mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -675,6 +677,31 @@ func TestNewLogWriter(t *testing.T) { require.Equal(t, meta.ResolvedTs, l.meta.ResolvedTs) require.Equal(t, map[int64]uint64{}, l.meta.ResolvedTsList) time.Sleep(time.Millisecond * time.Duration(math.Max(float64(defaultFlushIntervalInMs), float64(defaultGCIntervalInMs))+1)) + + origin := common.InitS3storage + defer func() { + common.InitS3storage = origin + }() + controller := gomock.NewController(t) + mockStorage := mockstorage.NewMockExternalStorage(controller) + // skip pre cleanup + mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(false, nil) + common.InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { + return mockStorage, nil + } + cfg3 := &LogWriterConfig{ + Dir: dir, + ChangeFeedID: "test-cf112232", + CaptureID: "cp", + MaxLogSize: 10, + CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}), + FlushIntervalInMs: 5, + S3Storage: true, + } + l3, err := NewLogWriter(ctx, cfg3) + require.Nil(t, err) + err = l3.Close() + require.Nil(t, err) } func TestWriterRedoGC(t *testing.T) { @@ -750,6 +777,7 @@ func TestDeleteAllLogs(t *testing.T) { closeErr error getAllFilesInS3Err error deleteFileErr error + writeFileErr error wantErr string }{ { @@ -783,6 +811,12 @@ func TestDeleteAllLogs(t *testing.T) { args: args{enableS3: true}, deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil), }, + { + name: "writerFile err", + args: args{enableS3: true}, + writeFileErr: errors.New("xx"), + wantErr: ".*xx*.", + }, } for _, tt := range tests { @@ -803,6 +837,8 @@ func TestDeleteAllLogs(t *testing.T) { mockStorage := mockstorage.NewMockExternalStorage(controller) mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tt.deleteFileErr).MaxTimes(2) + mockStorage.EXPECT().WriteFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.writeFileErr).MaxTimes(1) + mockWriter := &mockFileWriter{} mockWriter.On("Close").Return(tt.closeErr) cfg := &LogWriterConfig{ @@ -829,7 +865,8 @@ func TestDeleteAllLogs(t *testing.T) { require.Regexp(t, tt.wantErr, ret.Error(), tt.name) } else { require.Nil(t, ret, tt.name) - require.Equal(t, 0, len(logWriters), tt.name) + _, ok := logWriters[writer.cfg.ChangeFeedID] + require.False(t, ok, tt.name) if !tt.args.enableS3 { _, err := os.Stat(dir) require.True(t, os.IsNotExist(err), tt.name) @@ -839,3 +876,73 @@ func TestDeleteAllLogs(t *testing.T) { getAllFilesInS3 = origin } } + +func TestPreCleanUpS3(t *testing.T) { + testCases := []struct { + name string + fileExistsErr error + fileExists bool + getAllFilesInS3Err error + deleteFileErr error + wantErr string + }{ + { + name: "happy no marker", + fileExists: false, + }, + { + name: "fileExists err", + fileExistsErr: errors.New("xx"), + wantErr: ".*xx*.", + }, + { + name: "getAllFilesInS3 err", + fileExists: true, + getAllFilesInS3Err: errors.New("xx"), + wantErr: ".*xx*.", + }, + { + name: "deleteFile normal err", + fileExists: true, + deleteFileErr: errors.New("xx"), + wantErr: ".*ErrS3StorageAPI*.", + }, + { + name: "deleteFile notExist err", + fileExists: true, + deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil), + }, + } + + for _, tc := range testCases { + origin := getAllFilesInS3 + getAllFilesInS3 = func(ctx context.Context, l *LogWriter) ([]string, error) { + return []string{"1", "11", "delete_test-cf"}, tc.getAllFilesInS3Err + } + controller := gomock.NewController(t) + mockStorage := mockstorage.NewMockExternalStorage(controller) + + mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(tc.fileExists, tc.fileExistsErr) + mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tc.deleteFileErr).MaxTimes(3) + + cfg := &LogWriterConfig{ + Dir: "dir", + ChangeFeedID: "test-cf", + CaptureID: "cp", + MaxLogSize: 10, + CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}), + FlushIntervalInMs: 5, + } + writer := LogWriter{ + cfg: cfg, + storage: mockStorage, + } + ret := writer.preCleanUpS3(context.Background()) + if tc.wantErr != "" { + require.Regexp(t, tc.wantErr, ret.Error(), tc.name) + } else { + require.Nil(t, ret, tc.name) + } + getAllFilesInS3 = origin + } +}