Skip to content

Commit

Permalink
redolog: add a precleanup process when s3 enable (#3525)
Browse files Browse the repository at this point in the history
  • Loading branch information
ben1009 authored Dec 14, 2021
1 parent 958f142 commit fbf5f4e
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 3 deletions.
4 changes: 4 additions & 0 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ func (w *Writer) shouldRemoved(checkPointTs uint64, f os.FileInfo) (bool, error)
func (w *Writer) getShouldRemovedFiles(checkPointTs uint64) ([]os.FileInfo, error) {
files, err := ioutil.ReadDir(w.cfg.Dir)
if err != nil {
if os.IsNotExist(err) {
log.Warn("check removed log dir fail", zap.Error(err))
return []os.FileInfo{}, nil
}
return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't read log file directory: %s", w.cfg.Dir))
}

Expand Down
11 changes: 10 additions & 1 deletion cdc/redo/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,17 @@ func TestWriterGC(t *testing.T) {
require.Nil(t, err, files[0].Name())
require.EqualValues(t, 3, ts)
require.Equal(t, common.DefaultRowLogFileType, fileType)

time.Sleep(time.Duration(100) * time.Millisecond)

w1 := &Writer{
cfg: cfg,
uint64buf: make([]byte, 8),
storage: mockStorage,
}
w1.cfg.Dir += "not-exist"
w1.running.Store(true)
err = w1.GC(111)
require.Nil(t, err)
}

func TestAdvanceTs(t *testing.T) {
Expand Down
53 changes: 52 additions & 1 deletion cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error)
if err != nil {
return nil, err
}
} else {
if cfg.S3Storage {
// since other process get the remove changefeed job async, may still write some logs after owner delete the log
err = logWriter.preCleanUpS3(ctx)
if err != nil {
return nil, err
}
}
}

logWriter.metricTotalRowsCount = redoTotalRowsCountGauge.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID)
Expand All @@ -186,6 +194,38 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error)
return logWriter, nil
}

func (l *LogWriter) preCleanUpS3(ctx context.Context) error {
ret, err := l.storage.FileExists(ctx, l.getDeletedChangefeedMarker())
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}
if !ret {
return nil
}

files, err := getAllFilesInS3(ctx, l)
if err != nil {
return err
}

ff := []string{}
for _, file := range files {
if file != l.getDeletedChangefeedMarker() {
ff = append(ff, file)
}
}
err = l.deleteFilesInS3(ctx, ff)
if err != nil {
return err
}
err = l.storage.DeleteFile(ctx, l.getDeletedChangefeedMarker())
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}

return nil
}

func (l *LogWriter) initMeta(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -441,7 +481,18 @@ func (l *LogWriter) DeleteAllLogs(ctx context.Context) error {
}
// after delete logs, rm the LogWriter since it is already closed
l.cleanUpLogWriter()
return nil

// write a marker to s3, since other process get the remove changefeed job async,
// may still write some logs after owner delete the log
return l.writeDeletedMarkerToS3(ctx)
}

func (l *LogWriter) getDeletedChangefeedMarker() string {
return fmt.Sprintf("delete_%s", l.cfg.ChangeFeedID)
}

func (l *LogWriter) writeDeletedMarkerToS3(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3StorageAPI, l.storage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D")))
}

func (l *LogWriter) cleanUpLogWriter() {
Expand Down
109 changes: 108 additions & 1 deletion cdc/redo/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/ticdc/cdc/redo/common"
cerror "github.com/pingcap/ticdc/pkg/errors"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
Expand Down Expand Up @@ -676,6 +678,31 @@ func TestNewLogWriter(t *testing.T) {
require.Equal(t, meta.ResolvedTs, l.meta.ResolvedTs)
require.Equal(t, map[int64]uint64{}, l.meta.ResolvedTsList)
time.Sleep(time.Millisecond * time.Duration(math.Max(float64(defaultFlushIntervalInMs), float64(defaultGCIntervalInMs))+1))

origin := common.InitS3storage
defer func() {
common.InitS3storage = origin
}()
controller := gomock.NewController(t)
mockStorage := mockstorage.NewMockExternalStorage(controller)
// skip pre cleanup
mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(false, nil)
common.InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
return mockStorage, nil
}
cfg3 := &LogWriterConfig{
Dir: dir,
ChangeFeedID: "test-cf112232",
CaptureID: "cp",
MaxLogSize: 10,
CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}),
FlushIntervalInMs: 5,
S3Storage: true,
}
l3, err := NewLogWriter(ctx, cfg3)
require.Nil(t, err)
err = l3.Close()
require.Nil(t, err)
}

func TestWriterRedoGC(t *testing.T) {
Expand Down Expand Up @@ -751,6 +778,7 @@ func TestDeleteAllLogs(t *testing.T) {
closeErr error
getAllFilesInS3Err error
deleteFileErr error
writeFileErr error
wantErr string
}{
{
Expand Down Expand Up @@ -784,6 +812,12 @@ func TestDeleteAllLogs(t *testing.T) {
args: args{enableS3: true},
deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
},
{
name: "writerFile err",
args: args{enableS3: true},
writeFileErr: errors.New("xx"),
wantErr: ".*xx*.",
},
}

for _, tt := range tests {
Expand All @@ -804,6 +838,8 @@ func TestDeleteAllLogs(t *testing.T) {
mockStorage := mockstorage.NewMockExternalStorage(controller)

mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tt.deleteFileErr).MaxTimes(2)
mockStorage.EXPECT().WriteFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.writeFileErr).MaxTimes(1)

mockWriter := &mockFileWriter{}
mockWriter.On("Close").Return(tt.closeErr)
cfg := &LogWriterConfig{
Expand All @@ -830,7 +866,8 @@ func TestDeleteAllLogs(t *testing.T) {
require.Regexp(t, tt.wantErr, ret.Error(), tt.name)
} else {
require.Nil(t, ret, tt.name)
require.Equal(t, 0, len(logWriters), tt.name)
_, ok := logWriters[writer.cfg.ChangeFeedID]
require.False(t, ok, tt.name)
if !tt.args.enableS3 {
_, err := os.Stat(dir)
require.True(t, os.IsNotExist(err), tt.name)
Expand All @@ -840,3 +877,73 @@ func TestDeleteAllLogs(t *testing.T) {
getAllFilesInS3 = origin
}
}

func TestPreCleanUpS3(t *testing.T) {
testCases := []struct {
name string
fileExistsErr error
fileExists bool
getAllFilesInS3Err error
deleteFileErr error
wantErr string
}{
{
name: "happy no marker",
fileExists: false,
},
{
name: "fileExists err",
fileExistsErr: errors.New("xx"),
wantErr: ".*xx*.",
},
{
name: "getAllFilesInS3 err",
fileExists: true,
getAllFilesInS3Err: errors.New("xx"),
wantErr: ".*xx*.",
},
{
name: "deleteFile normal err",
fileExists: true,
deleteFileErr: errors.New("xx"),
wantErr: ".*ErrS3StorageAPI*.",
},
{
name: "deleteFile notExist err",
fileExists: true,
deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
},
}

for _, tc := range testCases {
origin := getAllFilesInS3
getAllFilesInS3 = func(ctx context.Context, l *LogWriter) ([]string, error) {
return []string{"1", "11", "delete_test-cf"}, tc.getAllFilesInS3Err
}
controller := gomock.NewController(t)
mockStorage := mockstorage.NewMockExternalStorage(controller)

mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(tc.fileExists, tc.fileExistsErr)
mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tc.deleteFileErr).MaxTimes(3)

cfg := &LogWriterConfig{
Dir: "dir",
ChangeFeedID: "test-cf",
CaptureID: "cp",
MaxLogSize: 10,
CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}),
FlushIntervalInMs: 5,
}
writer := LogWriter{
cfg: cfg,
storage: mockStorage,
}
ret := writer.preCleanUpS3(context.Background())
if tc.wantErr != "" {
require.Regexp(t, tc.wantErr, ret.Error(), tc.name)
} else {
require.Nil(t, ret, tc.name)
}
getAllFilesInS3 = origin
}
}

0 comments on commit fbf5f4e

Please sign in to comment.