Skip to content

Commit

Permalink
Merge branch 'master' into issue_29915
Browse files Browse the repository at this point in the history
  • Loading branch information
sylzd authored Nov 23, 2021
2 parents 8340df3 + fb5eb1f commit fbfdb84
Show file tree
Hide file tree
Showing 93 changed files with 4,398 additions and 3,710 deletions.
208 changes: 110 additions & 98 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (

"github.com/golang/mock/gomock"
"github.com/google/uuid"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

Expand All @@ -23,30 +23,25 @@ type backendSuite struct {
ts uint64
}

var _ = Suite(&backendSuite{})

func Test(t *testing.T) {
TestingT(t)
}

// FIXME: Cannot use the real SetUpTest/TearDownTest to set up the mock
// otherwise the mock error will be ignored.

func (s *backendSuite) setUpTest(c gomock.TestReporter) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = backend.MakeBackend(s.mockBackend)
s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0)
func createBackendSuite(c gomock.TestReporter) *backendSuite {
controller := gomock.NewController(c)
mockBackend := mock.NewMockBackend(controller)
return &backendSuite{
controller: controller,
mockBackend: mockBackend,
backend: backend.MakeBackend(mockBackend),
ts: oracle.ComposeTS(time.Now().Unix()*1000, 0),
}
}

func (s *backendSuite) tearDownTest() {
s.controller.Finish()
}

func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
s.setUpTest(c)
func TestOpenCloseImportCleanUpEngine(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
engineUUID := uuid.MustParse("902efee3-a3f9-53d4-8c82-f12fb1900cd1")

Expand All @@ -67,17 +62,18 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
After(importCall)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
closedEngine, err := engine.Close(ctx, nil)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestUnsafeCloseEngine(c *C) {
s.setUpTest(c)
func TestUnsafeCloseEngine(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -92,13 +88,14 @@ func (s *backendSuite) TestUnsafeCloseEngine(c *C) {
After(closeCall)

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", -1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestUnsafeCloseEngineWithUUID(c *C) {
s.setUpTest(c)
func TestUnsafeCloseEngineWithUUID(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -113,13 +110,14 @@ func (s *backendSuite) TestUnsafeCloseEngineWithUUID(c *C) {
After(closeCall)

closedEngine, err := s.backend.UnsafeCloseEngineWithUUID(ctx, nil, "some_tag", engineUUID)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestWriteEngine(c *C) {
s.setUpTest(c)
func TestWriteEngine(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -145,19 +143,20 @@ func (s *backendSuite) TestWriteEngine(c *C) {
Return(nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2)
c.Assert(err, IsNil)
require.NoError(t, err)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
s.setUpTest(c)
func TestWriteToEngineWithNothing(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -170,17 +169,18 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
require.NoError(t, err)
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestOpenEngineFailed(c *C) {
s.setUpTest(c)
func TestOpenEngineFailed(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -189,11 +189,12 @@ func (s *backendSuite) TestOpenEngineFailed(c *C) {
Return(errors.New("fake unrecoverable open error"))

_, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
c.Assert(err, ErrorMatches, "fake unrecoverable open error")
require.EqualError(t, err, "fake unrecoverable open error")
}

func (s *backendSuite) TestWriteEngineFailed(c *C) {
s.setUpTest(c)
func TestWriteEngineFailed(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -209,17 +210,19 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
mockWriter.EXPECT().Close(ctx).Return(nil, nil)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
require.Error(t, err)
require.Regexp(t, "fake unrecoverable write error.*", err.Error())
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
s.setUpTest(c)
func TestWriteBatchSendFailedWithRetry(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -235,17 +238,19 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{})
c.Assert(err, IsNil)
require.NoError(t, err)
err = writer.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
require.Error(t, err)
require.Regexp(t, ".*fake recoverable write batch error", err.Error())
_, err = writer.Close(ctx)
c.Assert(err, IsNil)
require.NoError(t, err)
}

func (s *backendSuite) TestImportFailedNoRetry(c *C) {
s.setUpTest(c)
func TestImportFailedNoRetry(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -256,13 +261,15 @@ func (s *backendSuite) TestImportFailedNoRetry(c *C) {
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
c.Assert(err, ErrorMatches, "fake unrecoverable import error.*")
require.Error(t, err)
require.Regexp(t, "fake unrecoverable import error.*", err.Error())
}

func (s *backendSuite) TestImportFailedWithRetry(c *C) {
s.setUpTest(c)
func TestImportFailedWithRetry(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -275,13 +282,15 @@ func (s *backendSuite) TestImportFailedWithRetry(c *C) {
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
c.Assert(err, ErrorMatches, ".*fake recoverable import error")
require.Error(t, err)
require.Regexp(t, ".*fake recoverable import error", err.Error())
}

func (s *backendSuite) TestImportFailedRecovered(c *C) {
s.setUpTest(c)
func TestImportFailedRecovered(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

ctx := context.Background()
Expand All @@ -296,46 +305,49 @@ func (s *backendSuite) TestImportFailedRecovered(c *C) {
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
c.Assert(err, IsNil)
require.NoError(t, err)
}

//nolint:interfacer // change test case signature causes check panicking.
func (s *backendSuite) TestClose(c *C) {
s.setUpTest(c)
func TestClose(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

s.mockBackend.EXPECT().Close().Return()

s.backend.Close()
}

func (s *backendSuite) TestMakeEmptyRows(c *C) {
s.setUpTest(c)
func TestMakeEmptyRows(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

rows := mock.NewMockRows(s.controller)
s.mockBackend.EXPECT().MakeEmptyRows().Return(rows)

c.Assert(s.mockBackend.MakeEmptyRows(), Equals, rows)
require.Equal(t, rows, s.mockBackend.MakeEmptyRows())
}

func (s *backendSuite) TestNewEncoder(c *C) {
s.setUpTest(c)
func TestNewEncoder(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

encoder := mock.NewMockEncoder(s.controller)
options := &kv.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, options)
c.Assert(realEncoder, Equals, encoder)
c.Assert(err, IsNil)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}

func (s *backendSuite) TestCheckDiskQuota(c *C) {
s.setUpTest(c)
func TestCheckDiskQuota(t *testing.T) {
t.Parallel()
s := createBackendSuite(t)
defer s.tearDownTest()

uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111")
Expand Down Expand Up @@ -381,29 +393,29 @@ func (s *backendSuite) TestCheckDiskQuota(c *C) {

// No quota exceeded
le, iple, ds, ms := s.backend.CheckDiskQuota(30000)
c.Assert(le, HasLen, 0)
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))
require.Len(t, le, 0)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the largest one is out
le, iple, ds, ms = s.backend.CheckDiskQuota(20000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid9})
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))
require.Equal(t, []uuid.UUID{uuid9}, le)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the importing one should be ranked least priority
le, iple, ds, ms = s.backend.CheckDiskQuota(12000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid5, uuid9})
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))
require.Equal(t, []uuid.UUID{uuid5, uuid9}, le)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the importing ones should not be visible
le, iple, ds, ms = s.backend.CheckDiskQuota(5000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid1, uuid5, uuid9})
c.Assert(iple, Equals, 1)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))
require.Equal(t, []uuid.UUID{uuid1, uuid5, uuid9}, le)
require.Equal(t, 1, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)
}
Loading

0 comments on commit fbfdb84

Please sign in to comment.