Skip to content

Commit

Permalink
Merge branch 'master' into fix-40839
Browse files Browse the repository at this point in the history
  • Loading branch information
dsdashun authored Feb 8, 2023
2 parents 29c66b4 + 93c6492 commit 4d73b03
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"@com_github_golang_protobuf//proto",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down Expand Up @@ -78,6 +79,7 @@ go_test(
"//tablecodec",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
return
case e, ok := <-ch:
if !ok {
log.Info("[log backup advancer] Task watcher exits due to stream ends.")
return
}
log.Info("meet task event", zap.Stringer("event", &e))
log.Info("[log backup advancer] Meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(ctx, e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) })
}
log.Info("[log backup advancer] Task watcher exits due to some error.", logutil.ShortError(err))
return
}
}
Expand Down
46 changes: 32 additions & 14 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"strings"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -94,6 +97,9 @@ func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (Ta

func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) {
result := make([]TaskEvent, 0, len(resp.Events))
if err := resp.Err(); err != nil {
return nil, err
}
for _, event := range resp.Events {
te, err := t.toTaskEvent(ctx, event)
if err != nil {
Expand All @@ -110,6 +116,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
log.Warn("[log backup advancer] Meet error during receiving the task event.", logutil.ShortError(err))
ch <- errorEvent(err)
return false
}
Expand All @@ -118,33 +125,44 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
}
return true
}
collectRemaining := func() {
log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c)))
defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.")
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
}

go func() {
defer close(ch)
for {
select {
case resp, ok := <-c:
failpoint.Inject("advancer_close_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
})
if !ok {
ch <- errorEvent(io.EOF)
return
}
if !handleResponse(resp) {
return
}
case <-ctx.Done():
// drain the remain event from channel.
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
collectRemaining()
ch <- errorEvent(ctx.Err())
return
}
}
}()
Expand Down
44 changes: 42 additions & 2 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"net/url"
"path"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) {
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
Expand Down Expand Up @@ -295,6 +298,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))

first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
Expand All @@ -310,8 +314,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
require.Equal(t, forth.Type, streamhelper.EventDel)
require.Equal(t, forth.Name, taskName2)
cancel()
_, ok := <-ch
require.False(t, ok)
fifth, ok := <-ch
require.True(t, ok)
require.Equal(t, fifth.Type, streamhelper.EventErr)
require.Error(t, fifth.Err, context.Canceled)
item, ok := <-ch
require.False(t, ok, "%v", item)
}

func testStreamClose(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
taskName := "close_simple"
taskInfo := simpleTask(taskName, 4)

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
ch := make(chan streamhelper.TaskEvent, 1024)
require.NoError(t, metaCli.Begin(ctx, ch))
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
require.ElementsMatch(t, first.Ranges, simpleRanges(4))
second := <-ch
require.Equal(t, second.Type, streamhelper.EventDel, "%s", second)
require.Equal(t, second.Name, taskName, "%s", second)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel", "return"))
defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel")
// We need to make the channel file some events hence we can simulate the closed channel.
taskName2 := "close_simple2"
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))

third := <-ch
require.Equal(t, third.Type, streamhelper.EventErr)
require.Error(t, third.Err, io.EOF)
item, ok := <-ch
require.False(t, ok, "%#v", item)
}

func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
Expand Down

0 comments on commit 4d73b03

Please sign in to comment.