From 97120bdecf94d8d8b11b3bb1e26b174cd5198a9f Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 3 Nov 2023 08:58:44 -0700 Subject: [PATCH] dynamically connect to a target specified in client attrs --- go/vt/vtgateproxy/vtgateproxy.go | 70 +++++++++++++++++--------------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index d1a336566de..b27d583bead 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -23,12 +23,12 @@ import ( "flag" "fmt" "io" + "sync" "time" "google.golang.org/grpc" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/schema" @@ -38,49 +38,66 @@ import ( ) var ( - target = flag.String("target", "", "vtgate host:port target used to dial the GRPC connection") dialTimeout = flag.Duration("dial_timeout", 5*time.Second, "dialer timeout for the GRPC connection") defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable") sysVarSetEnabled = flag.Bool("enable_system_settings", true, "This will enable the system settings to be changed per session at the database connection level") - vtGateProxy *VTGateProxy = &VTGateProxy{} + vtGateProxy *VTGateProxy = &VTGateProxy{ + targetConns: map[string]*vtgateconn.VTGateConn{}, + mu: sync.Mutex{}, + } ) type VTGateProxy struct { - conn *vtgateconn.VTGateConn + targetConns map[string]*vtgateconn.VTGateConn + mu sync.Mutex } -func (proxy *VTGateProxy) connect(ctx context.Context) error { - grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithBlock()), nil - }) +func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { + // If the connection exists, return it + proxy.mu.Lock() + conn, _ := proxy.targetConns[target] + if conn != nil { + proxy.mu.Unlock() + return conn, nil + } + proxy.mu.Unlock() + + // Otherwise create a new connection after dropping the lock, allowing multiple requests to + // race to create the conn for now. + // grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { + // return append(opts, grpc.WithBlock()), nil + // }) grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil }) - conn, err := vtgateconn.DialProtocol(ctx, "grpc", *target) + conn, err := vtgateconn.DialProtocol(ctx, "grpc", target) if err != nil { - return err + return nil, err } - proxy.conn = conn - return nil + proxy.mu.Lock() + proxy.targetConns[target] = conn + proxy.mu.Unlock() + + return conn, nil } -func (proxy *VTGateProxy) NewSession(options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) { - if proxy.conn == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "not connnected") +func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) { + target, ok := connectionAttributes["target"] + if !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no target string supplied by client") } - target, ok := connectionAttributes["target"] - if ok { - fmt.Printf("Creating new session from upstream provided target string: %v\n", target) + conn, err := proxy.getConnection(ctx, target) + if err != nil { + return nil, err } - // XXX/demmer handle schemaName? - return proxy.conn.Session("", options), nil + return conn.Session("", options), nil } // CloseSession closes the session, rolling back any implicit transactions. This has the @@ -128,16 +145,5 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn return nil } -func Init() error { - // XXX maybe add connect timeout? - ctx, cancel := context.WithTimeout(context.Background(), *dialTimeout) - defer cancel() - err := vtGateProxy.connect(ctx) - if err != nil { - log.Fatalf("error connecting to vtgate: %v", err) - return err - } - log.Infof("Connected to VTGate at %s", *target) - - return nil +func Init() { }