Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport #14224 #145

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -146,6 +148,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets ...*topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr)
Expand Down Expand Up @@ -218,7 +221,7 @@ func NewTabletPicker(
}
}

return &TabletPicker{
tp := &TabletPicker{
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
Expand All @@ -227,7 +230,14 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
}, nil
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
}

for _, ignoreTablet := range ignoreTablets {
tp.ignoreTablets[ignoreTablet.String()] = struct{}{}
}

return tp, nil
}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
Expand Down Expand Up @@ -369,6 +379,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
return nil
}
aliases = append(aliases, si.PrimaryAlias)
if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
aliases = append(aliases, si.PrimaryAlias)
}
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
Expand Down Expand Up @@ -404,7 +417,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}

for _, node := range sri.Nodes {
aliases = append(aliases, node.TabletAlias)
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,28 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) {
assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want)
}

func TestPickWithIgnoreList(t *testing.T) {
ctx := context.Background()
te := newPickerTestEnv(t, []string{"cell1", "cell2"})

want := addTablet(te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, want)

dontWant := addTablet(te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, dontWant)

// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias())
require.NoError(t, err)

// Try it many times to be sure we don't ever pick from the ignore list.
for i := 0; i < 100; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant)
}
}

func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
// test env puts all cells into an alias called "cella"
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
Expand Down
48 changes: 45 additions & 3 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type vstreamManager struct {
// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
const maxSkewTimeoutSeconds = 10 * 60

// tabletPickerContextTimeout is the timeout for the child context used to select candidate tablets
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -130,6 +134,7 @@ type journalEvent struct {

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")

return &vstreamManager{
resolver: resolver,
toposerv: serv,
Expand Down Expand Up @@ -481,6 +486,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)

errCount := 0
for {
Expand All @@ -498,12 +504,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var eventss [][]*binlogdatapb.VEvent
var err error
cells := vs.getCells()
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions)
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
if err != nil {
log.Errorf(err.Error())
return err
}
tablet, err := tp.PickForStreaming(ctx)

// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found.
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
Expand Down Expand Up @@ -678,11 +690,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}
if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE {
retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
}

errCount++
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
Expand All @@ -691,6 +709,30 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

// shouldRetry determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error can be retried, while the second
// indicates whether the tablet with which the error occurred should be ommitted
// from the candidate list of tablets to choose from on the retry.
//
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}

return false, false
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
Expand Down
171 changes: 136 additions & 35 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,47 +390,132 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
}

func TestVStreamRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestVStreamRetriableErrors(t *testing.T) {
type testCase struct {
name string
code vtrpcpb.Code
msg string
shouldRetry bool
ignoreTablet bool
}

cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
tcases := []testCase{
{
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldRetry: true,
ignoreTablet: false,
},
{
name: "gtid mismatch",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "GTIDSet Mismatch aa",
shouldRetry: true,
ignoreTablet: true,
},
{
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
msg: "",
shouldRetry: true,
ignoreTablet: false,
},
{
name: "should not retry",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
ignoreTablet: false,
},
}

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_COMMIT},
}
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa"))
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
var count sync2.AtomicInt32
count.Set(0)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}},
}
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
wantErr := "final error"
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)

want := &binlogdatapb.VStreamResponse{Events: commit}

for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// aa will be the local cell for this test, but that tablet will have a vstream error.
cells := []string{"aa", "ab"}

ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopoMultiCell(ctx, cells, ks, []string{"-20"})

sbc0 := hc.AddTestTablet(cells[0], "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil)
sbc1 := hc.AddTestTablet(cells[1], "1.1.1.1", 1002, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil)

addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
addTabletToSandboxTopo(t, st, ks, "-20", sbc1.Tablet())

vsm := newTestVStreamManager(hc, st, cells[0], false)

// Always have the local cell tablet error so it's ignored on retry and we pick the other one
// if the error requires ignoring the tablet on retry.
sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg))

if tcase.ignoreTablet {
sbc1.AddVStreamEvents(commit, nil)
} else {
sbc0.AddVStreamEvents(commit, nil)
}

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}},
}

ch := make(chan *binlogdatapb.VStreamResponse)
done := make(chan struct{})
go func() {
err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})
wantErr := "context canceled"

if !tcase.shouldRetry {
wantErr = tcase.msg
}

if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)
}
close(done)
}()

Loop:
for {
if tcase.shouldRetry {
select {
case event := <-ch:
got := proto.Clone(event).(*binlogdatapb.VStreamResponse)
if !proto.Equal(got, want) {
t.Errorf("got different vstream event than expected")
}
cancel()
case <-done:
// The goroutine has completed, so break out of the loop
break Loop
}
} else {
<-done
break Loop
}
}
})
}
time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish
assert.Equal(t, int32(2), count.Get())
}

func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
Expand Down Expand Up @@ -1315,6 +1400,22 @@ func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards []
return st
}

func getSandboxTopoMultiCell(ctx context.Context, cells []string, keyspace string, shards []string) *sandboxTopo {
st := newSandboxForCells(cells)
ts := st.topoServer

for _, cell := range cells {
ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{})
}

ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})

for _, shard := range shards {
ts.CreateShard(ctx, keyspace, shard)
}
return st
}

func addTabletToSandboxTopo(t *testing.T, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) {
_, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = tablet.Alias
Expand Down
Loading