Skip to content

Commit

Permalink
add vtgate flag that explicitly allows vstream copy (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
pbibra authored Sep 12, 2023
1 parent e87c86a commit 68cf290
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 43 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ Usage of vtgate:
Select tcp, tcp4, or tcp6 to control the socket type. (default tcp)
--no_scatter
when set to true, the planner will fail instead of producing a plan that includes scatter queries
--no_vstream_copy
when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this
--normalize_queries
Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true)
--onclose_timeout duration
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false)

return nil
}
Expand Down
31 changes: 18 additions & 13 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Executor struct {

// allowScatter will fail planning if set to false and a plan contains any scatter queries
allowScatter bool
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool
}

var executorOnce sync.Once
Expand All @@ -127,20 +130,22 @@ func NewExecutor(
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
noVstreamCopy bool,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
allowVstreamCopy: !noVstreamCopy,
pv: pv,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1318,7 +1323,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
return err
}

vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell)
vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy)
vs := &vstream{
vgtid: vgtid,
tabletType: topodatapb.TabletType_PRIMARY,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
// create a new session each time so that ShardSessions don't get re-used across tests
Expand All @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down Expand Up @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) {
}

func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
}

func TestSelectScatter(t *testing.T) {
Expand Down Expand Up @@ -2981,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
count++
}

executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
before := runtime.NumGoroutine()

query := "select id, col from user order by id limit 2"
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
Expand Down
19 changes: 15 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

Expand All @@ -47,6 +48,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
Expand Down Expand Up @@ -119,12 +123,13 @@ type journalEvent struct {
done chan struct{}
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
resolver: resolver,
toposerv: serv,
cell: cell,
allowVstreamCopy: allowVstreamCopy,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
Expand Down Expand Up @@ -540,6 +545,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
log.Infof("Starting to vstream from %s", tablet.Alias.String())
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
var vstreamCreatedOnce sync.Once

if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) {
// We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping)
return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported")
}

err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand Down
82 changes: 66 additions & 16 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestVStreamSkew(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}}
want := int64(0)
var sbc0, sbc1 *sandboxconn.SandboxConn
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestVStreamEvents(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})

vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -213,7 +213,7 @@ func TestVStreamChunks(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestVStreamManagerGetCells(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
ts, _ := st.GetTopoServer()

for _, tcase := range tcases {
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestVStreamMulti(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestVStreamRetry(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -561,7 +561,7 @@ func TestVStreamJournalOneToMany(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestVStreamJournalManyToOne(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -920,7 +920,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet())
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func TestResolveVStreamParams(t *testing.T) {
name := "TestVStream"
_ = createSandbox(name)
hc := discovery.NewFakeHealthCheck(nil)
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa", true)
testcases := []struct {
input *binlogdatapb.VGtid
output *binlogdatapb.VGtid
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
vgtid := &binlogdatapb.VGtid{
Expand Down Expand Up @@ -1195,10 +1195,60 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
func TestVstreamCopy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cell := "aa"
ks := "TestVStreamCopy"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_COMMIT},
}
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa"))
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
var count sync2.AtomicInt32
count.Set(0)
// empty gtid id means no start position = bootstrapping/vstream copy
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "",
}},
}

// allowVstreamCopy = false
vsm := newTestVStreamManager(hc, st, "aa", false)
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Error(t, err)
require.Equal(t, err.Error(), "vstream copy is not currently supported")

// allowVstreamCopy = true
vsm2 := newTestVStreamManager(hc, st, "aa", true)
err = vsm2.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Equal(t, err.Error(), "final error")
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
gw := NewTabletGateway(context.Background(), hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
return newVStreamManager(srvResolver, serv, cell)
return newVStreamManager(srvResolver, serv, cell, allowVstreamCopy)
}

func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var (
defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")
dbDDLPlugin = flag.String("dbddl_plugin", "fail", "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service")
noScatter = flag.Bool("no_scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries")
noVstreamCopy = flag.Bool("no_vstream_copy", false, "when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this")

// TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed

Expand Down Expand Up @@ -210,7 +211,7 @@ func Init(
sc := NewScatterConn("VttabletCall", tc, gw)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
resolver := NewResolver(srvResolver, serv, cell, sc)
vsm := newVStreamManager(srvResolver, serv, cell)
vsm := newVStreamManager(srvResolver, serv, cell, !*noVstreamCopy)

var si SchemaInfo // default nil
var st *vtschema.Tracker
Expand Down Expand Up @@ -238,6 +239,7 @@ func Init(
si,
*noScatter,
pv,
*noVstreamCopy,
)

// connect the schema tracker with the vschema manager
Expand Down
Loading

0 comments on commit 68cf290

Please sign in to comment.