diff --git a/ddl/syncer.go b/ddl/syncer.go index fa0982c875e65..bfaf329f066dd 100644 --- a/ddl/syncer.go +++ b/ddl/syncer.go @@ -18,7 +18,9 @@ import ( "math" "strconv" "sync" + "sync/atomic" "time" + "unsafe" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" @@ -87,7 +89,7 @@ type SchemaSyncer interface { type schemaVersionSyncer struct { selfSchemaVerPath string etcdCli *clientv3.Client - session *concurrency.Session + session unsafe.Pointer mu struct { sync.RWMutex globalVerCh clientv3.WatchChan @@ -138,23 +140,33 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error { return errors.Trace(err) } logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath) - s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL) + session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL) if err != nil { return errors.Trace(err) } + s.storeSession(session) s.mu.Lock() s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion) s.mu.Unlock() err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion, - clientv3.WithLease(s.session.Lease())) + clientv3.WithLease(s.loadSession().Lease())) + return errors.Trace(err) } +func (s *schemaVersionSyncer) loadSession() *concurrency.Session { + return (*concurrency.Session)(atomic.LoadPointer(&s.session)) +} + +func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) { + atomic.StorePointer(&s.session, (unsafe.Pointer)(session)) +} + // Done implements SchemaSyncer.Done interface. func (s *schemaVersionSyncer) Done() <-chan struct{} { - return s.session.Done() + return s.loadSession().Done() } // Restart implements SchemaSyncer.Restart interface. @@ -171,12 +183,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error { if err != nil { return errors.Trace(err) } - s.session = session + s.storeSession(session) childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout) defer cancel() err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion, - clientv3.WithLease(s.session.Lease())) + clientv3.WithLease(s.loadSession().Lease())) return errors.Trace(err) } @@ -214,7 +226,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int startTime := time.Now() ver := strconv.FormatInt(version, 10) err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver, - clientv3.WithLease(s.session.Lease())) + clientv3.WithLease(s.loadSession().Lease())) metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) return errors.Trace(err) diff --git a/domain/domain.go b/domain/domain.go index cc73b20af51c3..cf7b5b3022529 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -41,6 +41,7 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) // Domain represents a storage space. Different domains can use the same database name. @@ -351,13 +352,12 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) { case <-syncer.Done(): // The schema syncer stops, we need stop the schema validator to synchronize the schema version. log.Info("[ddl] reload schema in loop, schema syncer need restart") - do.SchemaValidator.Stop() err := do.mustRestartSyncer() if err != nil { log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err)) break } - do.SchemaValidator.Restart() + log.Info("[ddl] schema syncer restarted.") case <-do.exit: return } @@ -471,6 +471,12 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R DialOptions: []grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), + grpc.WithBackoffMaxDelay(time.Second * 3), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(10) * time.Second, + Timeout: time.Duration(3) * time.Second, + PermitWithoutStream: true, + }), }, TLS: ebd.TLSConfig(), })