Skip to content

Commit

Permalink
[wrangler] cleanup unused functions
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <andrew@planetscale.com>
  • Loading branch information
Andrew Mason committed Aug 28, 2023
1 parent 02d2aba commit 1d36207
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 181 deletions.
4 changes: 0 additions & 4 deletions go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ func (dc *fakeDBClient) Rollback() error {
func (dc *fakeDBClient) Close() {
}

func (dc *fakeDBClient) id() string {
return fmt.Sprintf("FakeDBClient(%s)", dc.name)
}

// ExecuteFetch is part of the DBClient interface
func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
dc.mu.Lock()
Expand Down
134 changes: 0 additions & 134 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -93,112 +92,6 @@ func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow
return allErrors.AggrError(vterrors.Aggregate)
}

func (wr *Wrangler) printShards(ctx context.Context, si []*topo.ShardInfo) error {
for _, si := range si {
wr.Logger().Printf(" Shard: %v\n", si.ShardName())
if len(si.SourceShards) != 0 {
wr.Logger().Printf(" Source Shards: %v\n", si.SourceShards)
}
ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return err
}
qr, err := wr.tmc.VReplicationExec(ctx, ti.Tablet, fmt.Sprintf("select * from _vt.vreplication where db_name=%v", encodeString(ti.DbName())))
if err != nil {
return err
}
res := sqltypes.Proto3ToResult(qr)
if len(res.Rows) != 0 {
wr.Logger().Printf(" VReplication:\n")
for _, row := range res.Rows {
wr.Logger().Printf(" %v\n", row)
}
}
wr.Logger().Printf(" Is Primary Serving: %v\n", si.IsPrimaryServing)
if len(si.TabletControls) != 0 {
wr.Logger().Printf(" Tablet Controls: %v\n", si.TabletControls)
}
}
return nil
}

func (wr *Wrangler) getPrimaryPositions(ctx context.Context, shards []*topo.ShardInfo) (map[*topo.ShardInfo]string, error) {
mu := sync.Mutex{}
result := make(map[*topo.ShardInfo]string)

wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, si := range shards {
wg.Add(1)
go func(si *topo.ShardInfo) {
defer wg.Done()
wr.Logger().Infof("Gathering primary position for %v", topoproto.TabletAliasString(si.PrimaryAlias))
ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
rec.RecordError(err)
return
}

pos, err := wr.tmc.PrimaryPosition(ctx, ti.Tablet)
if err != nil {
rec.RecordError(err)
return
}

wr.Logger().Infof("Got primary position for %v", topoproto.TabletAliasString(si.PrimaryAlias))
mu.Lock()
result[si] = pos
mu.Unlock()
}(si)
}
wg.Wait()
return result, rec.Error()
}

func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositions map[*topo.ShardInfo]string, destinationShards []*topo.ShardInfo, waitTime time.Duration) error {
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, si := range destinationShards {
wg.Add(1)
go func(si *topo.ShardInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()

var pos string
for _, sourceShard := range si.SourceShards {
// find the position it should be at
for s, sp := range sourcePositions {
if s.Keyspace() == sourceShard.Keyspace && s.ShardName() == sourceShard.Shard {
pos = sp
break
}
}

// and wait for it
wr.Logger().Infof("Waiting for %v to catch up", topoproto.TabletAliasString(si.PrimaryAlias))
ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
rec.RecordError(err)
return
}

if err := wr.tmc.VReplicationWaitForPos(ctx, ti.Tablet, sourceShard.Uid, pos); err != nil {
if strings.Contains(err.Error(), "not found") {
wr.Logger().Infof("%v stream %d was not found. Skipping wait.", topoproto.TabletAliasString(si.PrimaryAlias), sourceShard.Uid)
} else {
rec.RecordError(err)
}
} else {
wr.Logger().Infof("%v caught up", topoproto.TabletAliasString(si.PrimaryAlias))
}
}
}(si)
}
wg.Wait()
return rec.Error()
}

// refreshPrimaryTablets will just RPC-ping all the primary tablets with RefreshState
func (wr *Wrangler) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo) error {
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -230,33 +123,6 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha
return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger())
}

// updateFrozenFlag sets or unsets the Frozen flag for primary migration. This is performed
// for all primary tablet control records.
func (wr *Wrangler) updateFrozenFlag(ctx context.Context, shards []*topo.ShardInfo, value bool) (err error) {
for i, si := range shards {
updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY)
if tc != nil {
tc.Frozen = value
return nil
}
// This shard does not have a tablet control record, adding one to set frozen flag
tc = &topodatapb.Shard_TabletControl{
TabletType: topodatapb.TabletType_PRIMARY,
Frozen: value,
}
si.TabletControls = append(si.TabletControls, tc)
return nil
})
if err != nil {
return err
}

shards[i] = updatedShard
}
return nil
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
4 changes: 0 additions & 4 deletions go/vt/wrangler/testlib/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func TestVersion(t *testing.T) {
}()
discovery.SetTabletPickerRetryDelay(5 * time.Millisecond)

// We need to run this test with the /debug/vars version of the
// plugin.
wrangler.ResetDebugVarsGetVersion()

// Initialize our environment
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
39 changes: 0 additions & 39 deletions go/vt/wrangler/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ limitations under the License.
package wrangler

import (
"encoding/json"
"fmt"
"io"
"net/http"

"context"

Expand All @@ -31,42 +28,6 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var getVersionFromTabletDebugVars = func(tabletAddr string) (string, error) {
resp, err := http.Get("http://" + tabletAddr + "/debug/vars")
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

var vars struct {
BuildHost string
BuildUser string
BuildTimestamp int64
BuildGitRev string
}
err = json.Unmarshal(body, &vars)
if err != nil {
return "", err
}

version := fmt.Sprintf("%v", vars)
return version, nil
}

var getVersionFromTablet = getVersionFromTabletDebugVars

// ResetDebugVarsGetVersion is used by tests to reset the
// getVersionFromTablet variable to the default one. That way we can
// run the unit tests in testlib/ even when another implementation of
// getVersionFromTablet is used.
func ResetDebugVarsGetVersion() {
getVersionFromTablet = getVersionFromTabletDebugVars
}

// GetVersion returns the version string from a tablet
func (wr *Wrangler) GetVersion(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (string, error) {
resp, err := wr.VtctldServer().GetVersion(ctx, &vtctldatapb.GetVersionRequest{
Expand Down

0 comments on commit 1d36207

Please sign in to comment.