Skip to content

Commit

Permalink
Backport vitessio#13582
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Aug 12, 2024
1 parent 1b333db commit 2c4f6cf
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 88 deletions.
76 changes: 38 additions & 38 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@ limitations under the License.
package discovery

import (
"context"
"fmt"
"io"
"math/rand"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/vt/log"

"context"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type TabletPickerCellPreference int
Expand Down Expand Up @@ -291,13 +289,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
return candidates
}

// PickForStreaming picks an available tablet.
// PickForStreaming picks a tablet that is healthy and serving.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
// or the context is canceled
// Keep trying at intervals (tabletPickerRetryDelay) until a healthy
// serving tablet is found or the context is cancelled.
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -330,15 +327,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
if len(candidates) == 0 {
// if no candidates were found, sleep and try again
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds",
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
Expand All @@ -349,19 +346,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
for _, ti := range candidates {
// try to connect to tablet
if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil {
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
log.Infof("tablet picker found tablet %s", ti.Tablet.String())
return ti.Tablet, nil
}
// err found
log.Warningf("unable to connect to tablet for alias %v", ti.Alias)
}
// Got here? Means we iterated all tablets and did not find a healthy one
tp.incNoTabletFoundStat()
log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String())
return candidates[0].Tablet, nil
}
}

Expand Down Expand Up @@ -427,33 +413,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if len(aliases) == 0 {
return nil
}

shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
if err != nil {
log.Warningf("error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return
log.Warningf("Error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return.
if len(tabletMap) == 0 {
return nil
}
}

tablets := make([]*topo.TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)]
if !ok {
// Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores
// topo.ErrNoNode). Just log a warning
log.Warningf("failed to load tablet %v", tabletAlias)
// Either tablet disappeared on us, or we got a partial result
// (GetTabletMap ignores topo.ErrNoNode); just log a warning.
log.Warningf("Tablet picker failed to load tablet %v", tabletAlias)
} else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) {
tablets = append(tablets, tabletInfo)
// Try to connect to the tablet and confirm that it's usable.
if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil {
// Ensure that the tablet is healthy and serving.
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
tablets = append(tablets, tabletInfo)
}
_ = conn.Close(ctx)
}
}
}
return tablets
}

func init() {
// TODO(sougou): consolidate this call to be once per process.
rand.Seed(time.Now().UnixNano())
globalTPStats = newTabletPickerStats()
}

Expand Down
65 changes: 54 additions & 11 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func TestPickPrimary(t *testing.T) {
Expand Down Expand Up @@ -503,6 +504,45 @@ func TestPickErrorOnlySpecified(t *testing.T) {
require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0))
}

// TestPickFallbackType tests that when providing a list of tablet types to
// pick from, with the list in preference order, that when the primary/first
// type has no available healthy serving tablets that we select a healthy
// serving tablet from the secondary/second type.
func TestPickFallbackType(t *testing.T) {
cells := []string{"cell1", "cell2"}
localCell := cells[0]
tabletTypes := "replica,primary"
options := TabletPickerOptions{
TabletOrder: "InOrder",
}
te := newPickerTestEnv(t, cells)

// This one should be selected even though it's the secondary type
// as it is healthy and serving.
primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Replica tablet should not be selected as it is unhealthy.
replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false)
defer deleteTablet(t, te, replicaTablet)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel2()
tablet, err := tp.PickForStreaming(ctx2)
require.NoError(t, err)
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down Expand Up @@ -551,18 +591,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell
err := te.topoServ.CreateTablet(context.Background(), tablet)
require.NoError(te.t, err)

shr := &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"},
}
if healthy {
_ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: ""},
})
shr.RealtimeStats.HealthError = ""
}

_ = createFixedHealthConn(tablet, shr)

return tablet
}

Expand Down
62 changes: 43 additions & 19 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"

"google.golang.org/protobuf/encoding/prototext"
Expand All @@ -29,7 +30,6 @@ import (

"context"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/tb"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
Expand All @@ -40,6 +40,13 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
// How many times to retry tablet selection before we
// give up and return an error message that the user
// can see and act upon if needed.
tabletPickerRetries = 5
)

// controller is created by Engine. Members are initialized upfront.
// There is no mutex within a controller becaust its members are
// either read-only or self-synchronized.
Expand All @@ -59,7 +66,7 @@ type controller struct {
done chan struct{}

// The following fields are updated after start. So, they need synchronization.
sourceTablet sync2.AtomicString
sourceTablet atomic.Value

lastWorkflowError *lastError
}
Expand All @@ -79,6 +86,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
done: make(chan struct{}),
source: &binlogdatapb.BinlogSource{},
}
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

// id
Expand Down Expand Up @@ -173,7 +181,7 @@ func (ct *controller) run(ctx context.Context) {

func (ct *controller) runBlp(ctx context.Context) (err error) {
defer func() {
ct.sourceTablet.Set("")
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
if x := recover(); x != nil {
log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4))
err = fmt.Errorf("panic: %v", x)
Expand Down Expand Up @@ -203,22 +211,9 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
}
defer dbClient.Close()

var tablet *topodatapb.Tablet
if ct.source.GetExternalMysql() == "" {
log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id)
tablet, err = ct.tabletPicker.PickForStreaming(ctx)
if err != nil {
select {
case <-ctx.Done():
default:
ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1)
ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error()))
}
return err
}
ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String()))
log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String())
ct.sourceTablet.Set(tablet.Alias.String())
tablet, err := ct.pickSourceTablet(ctx, dbClient)
if err != nil {
return err
}
switch {
case len(ct.source.Tables) > 0:
Expand Down Expand Up @@ -294,6 +289,35 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string)
}
return nil
}

// pickSourceTablet picks a healthy serving tablet to source for
// the vreplication stream. If the source is marked as external, it
// returns nil.
func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (*topodatapb.Tablet, error) {
if ct.source.GetExternalMysql() != "" {
return nil, nil
}
log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s",
ct.id, ct.workflow)
tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries)
defer tpCancel()
tablet, err := ct.tabletPicker.PickForStreaming(tpCtx)
if err != nil {
select {
case <-ctx.Done():
default:
ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1)
ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error()))
}
return tablet, err
}
ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String()))
log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s",
tablet.Alias.String(), ct.id, ct.workflow)
ct.sourceTablet.Store(tablet.Alias)
return tablet, err
}

func (ct *controller) Stop() {
ct.cancel()
<-ct.done
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error {
return fmt.Errorf("unexpected result: %v", qr)
}

// When err is not nil then we got a retryable error and will loop again
// When err is not nil then we got a retryable error and will loop again.
if err == nil {
current, dcerr := binlogplayer.DecodePosition(qr.Rows[0][0].ToString())
if dcerr != nil {
Expand Down
Loading

0 comments on commit 2c4f6cf

Please sign in to comment.