Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redolog: add a precleanup process when s3 enable #3525

Merged
merged 14 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ func (w *Writer) shouldRemoved(checkPointTs uint64, f os.FileInfo) (bool, error)
func (w *Writer) getShouldRemovedFiles(checkPointTs uint64) ([]os.FileInfo, error) {
files, err := ioutil.ReadDir(w.cfg.Dir)
if err != nil {
if os.IsNotExist(err) {
log.Warn("check removed log dir fail", zap.Error(err))
return []os.FileInfo{}, nil
}
return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't read log file directory: %s", w.cfg.Dir))
}

Expand Down
11 changes: 10 additions & 1 deletion cdc/redo/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,17 @@ func TestWriterGC(t *testing.T) {
require.Nil(t, err, files[0].Name())
require.EqualValues(t, 3, ts)
require.Equal(t, common.DefaultRowLogFileType, fileType)

time.Sleep(time.Duration(100) * time.Millisecond)

w1 := &Writer{
cfg: cfg,
uint64buf: make([]byte, 8),
storage: mockStorage,
}
w1.cfg.Dir += "not-exist"
w1.running.Store(true)
err = w1.GC(111)
require.Nil(t, err)
}

func TestAdvanceTs(t *testing.T) {
Expand Down
53 changes: 52 additions & 1 deletion cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error)
if err != nil {
return nil, err
}
} else {
if cfg.S3Storage {
// since other process get the remove changefeed job async, may still write some logs after owner delete the log
err = logWriter.preCleanUpS3(ctx)
if err != nil {
return nil, err
}
}
}

logWriter.metricTotalRowsCount = redoTotalRowsCountGauge.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID)
Expand All @@ -186,6 +194,38 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error)
return logWriter, nil
}

func (l *LogWriter) preCleanUpS3(ctx context.Context) error {
ret, err := l.storage.FileExists(ctx, l.getDeletedChangefeedMarker())
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}
if !ret {
return nil
}

files, err := getAllFilesInS3(ctx, l)
if err != nil {
return err
}

ff := []string{}
for _, file := range files {
if file != l.getDeletedChangefeedMarker() {
ff = append(ff, file)
}
}
err = l.deleteFilesInS3(ctx, ff)
if err != nil {
return err
}
err = l.storage.DeleteFile(ctx, l.getDeletedChangefeedMarker())
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}

return nil
}

func (l *LogWriter) initMeta(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -441,7 +481,18 @@ func (l *LogWriter) DeleteAllLogs(ctx context.Context) error {
}
// after delete logs, rm the LogWriter since it is already closed
l.cleanUpLogWriter()
return nil

// write a marker to s3, since other process get the remove changefeed job async,
// may still write some logs after owner delete the log
return l.writeDeletedMarkerToS3(ctx)
}

func (l *LogWriter) getDeletedChangefeedMarker() string {
return fmt.Sprintf("delete_%s", l.cfg.ChangeFeedID)
}

func (l *LogWriter) writeDeletedMarkerToS3(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3StorageAPI, l.storage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D")))
}
Comment on lines +494 to 496
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the delete marker file written to S3, and the redo log dir in S3 is not deleted after changefeed is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the bucket is still there, no extra cost. but currently we do not delete the bucket even without the marker, just delete the object


func (l *LogWriter) cleanUpLogWriter() {
Expand Down
109 changes: 108 additions & 1 deletion cdc/redo/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/ticdc/cdc/redo/common"
cerror "github.com/pingcap/ticdc/pkg/errors"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
Expand Down Expand Up @@ -676,6 +678,31 @@ func TestNewLogWriter(t *testing.T) {
require.Equal(t, meta.ResolvedTs, l.meta.ResolvedTs)
require.Equal(t, map[int64]uint64{}, l.meta.ResolvedTsList)
time.Sleep(time.Millisecond * time.Duration(math.Max(float64(defaultFlushIntervalInMs), float64(defaultGCIntervalInMs))+1))

origin := common.InitS3storage
defer func() {
common.InitS3storage = origin
}()
controller := gomock.NewController(t)
mockStorage := mockstorage.NewMockExternalStorage(controller)
// skip pre cleanup
mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(false, nil)
common.InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
return mockStorage, nil
}
cfg3 := &LogWriterConfig{
Dir: dir,
ChangeFeedID: "test-cf112232",
CaptureID: "cp",
MaxLogSize: 10,
CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}),
FlushIntervalInMs: 5,
S3Storage: true,
}
l3, err := NewLogWriter(ctx, cfg3)
require.Nil(t, err)
err = l3.Close()
require.Nil(t, err)
}

func TestWriterRedoGC(t *testing.T) {
Expand Down Expand Up @@ -751,6 +778,7 @@ func TestDeleteAllLogs(t *testing.T) {
closeErr error
getAllFilesInS3Err error
deleteFileErr error
writeFileErr error
wantErr string
}{
{
Expand Down Expand Up @@ -784,6 +812,12 @@ func TestDeleteAllLogs(t *testing.T) {
args: args{enableS3: true},
deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
},
{
name: "writerFile err",
args: args{enableS3: true},
writeFileErr: errors.New("xx"),
wantErr: ".*xx*.",
},
}

for _, tt := range tests {
Expand All @@ -804,6 +838,8 @@ func TestDeleteAllLogs(t *testing.T) {
mockStorage := mockstorage.NewMockExternalStorage(controller)

mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tt.deleteFileErr).MaxTimes(2)
mockStorage.EXPECT().WriteFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.writeFileErr).MaxTimes(1)

mockWriter := &mockFileWriter{}
mockWriter.On("Close").Return(tt.closeErr)
cfg := &LogWriterConfig{
Expand All @@ -830,7 +866,8 @@ func TestDeleteAllLogs(t *testing.T) {
require.Regexp(t, tt.wantErr, ret.Error(), tt.name)
} else {
require.Nil(t, ret, tt.name)
require.Equal(t, 0, len(logWriters), tt.name)
_, ok := logWriters[writer.cfg.ChangeFeedID]
require.False(t, ok, tt.name)
if !tt.args.enableS3 {
_, err := os.Stat(dir)
require.True(t, os.IsNotExist(err), tt.name)
Expand All @@ -840,3 +877,73 @@ func TestDeleteAllLogs(t *testing.T) {
getAllFilesInS3 = origin
}
}

func TestPreCleanUpS3(t *testing.T) {
testCases := []struct {
name string
fileExistsErr error
fileExists bool
getAllFilesInS3Err error
deleteFileErr error
wantErr string
}{
{
name: "happy no marker",
fileExists: false,
},
{
name: "fileExists err",
fileExistsErr: errors.New("xx"),
wantErr: ".*xx*.",
},
{
name: "getAllFilesInS3 err",
fileExists: true,
getAllFilesInS3Err: errors.New("xx"),
wantErr: ".*xx*.",
},
{
name: "deleteFile normal err",
fileExists: true,
deleteFileErr: errors.New("xx"),
wantErr: ".*ErrS3StorageAPI*.",
},
{
name: "deleteFile notExist err",
fileExists: true,
deleteFileErr: awserr.New(s3.ErrCodeNoSuchKey, "no such key", nil),
},
}

for _, tc := range testCases {
origin := getAllFilesInS3
getAllFilesInS3 = func(ctx context.Context, l *LogWriter) ([]string, error) {
return []string{"1", "11", "delete_test-cf"}, tc.getAllFilesInS3Err
}
controller := gomock.NewController(t)
mockStorage := mockstorage.NewMockExternalStorage(controller)

mockStorage.EXPECT().FileExists(gomock.Any(), gomock.Any()).Return(tc.fileExists, tc.fileExistsErr)
mockStorage.EXPECT().DeleteFile(gomock.Any(), gomock.Any()).Return(tc.deleteFileErr).MaxTimes(3)

cfg := &LogWriterConfig{
Dir: "dir",
ChangeFeedID: "test-cf",
CaptureID: "cp",
MaxLogSize: 10,
CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}),
FlushIntervalInMs: 5,
}
writer := LogWriter{
cfg: cfg,
storage: mockStorage,
}
ret := writer.preCleanUpS3(context.Background())
if tc.wantErr != "" {
require.Regexp(t, tc.wantErr, ret.Error(), tc.name)
} else {
require.Nil(t, ret, tc.name)
}
getAllFilesInS3 = origin
}
}