From 25f4af2e4d24a2be23585190c9e8487058c97315 Mon Sep 17 00:00:00 2001 From: winkyao Date: Fri, 28 Sep 2018 17:54:27 +0800 Subject: [PATCH 1/2] domain: fast new a etcd session when the session is stale in the schemaVersionSyncer #7774 --- ddl/syncer.go | 26 +++++++++++++++++++------- domain/domain.go | 10 ++++++++-- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/ddl/syncer.go b/ddl/syncer.go index 526ec3d737504..b6ae291860c30 100644 --- a/ddl/syncer.go +++ b/ddl/syncer.go @@ -17,7 +17,9 @@ import ( "fmt" "math" "strconv" + "sync/atomic" "time" + "unsafe" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" @@ -83,8 +85,8 @@ type SchemaSyncer interface { type schemaVersionSyncer struct { selfSchemaVerPath string etcdCli *clientv3.Client - session *concurrency.Session globalVerCh clientv3.WatchChan + session unsafe.Pointer } // NewSchemaSyncer creates a new SchemaSyncer. @@ -131,19 +133,29 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error { return errors.Trace(err) } logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath) - s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL) + session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL) if err != nil { return errors.Trace(err) } s.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion) err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion, - clientv3.WithLease(s.session.Lease())) + clientv3.WithLease(s.loadSession().Lease())) + s.storeSession(session) + return errors.Trace(err) } +func (s *schemaVersionSyncer) loadSession() *concurrency.Session { + return (*concurrency.Session)(atomic.LoadPointer(&s.session)) +} + +func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) { + atomic.StorePointer(&s.session, (unsafe.Pointer)(session)) +} + // Done implements SchemaSyncer.Done interface. func (s *schemaVersionSyncer) Done() <-chan struct{} { - return s.session.Done() + return s.loadSession().Done() } // Restart implements SchemaSyncer.Restart interface. @@ -160,12 +172,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error { if err != nil { return errors.Trace(err) } - s.session = session + s.storeSession(session) childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout) defer cancel() err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion, - clientv3.WithLease(s.session.Lease())) + clientv3.WithLease(s.loadSession().Lease())) return errors.Trace(err) } @@ -180,7 +192,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int startTime := time.Now() ver := strconv.FormatInt(version, 10) err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver, - clientv3.WithLease(s.session.Lease())) + clientv3.WithLease(s.loadSession().Lease())) metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) return errors.Trace(err) diff --git a/domain/domain.go b/domain/domain.go index 41fd0bef3bd07..693a1c6902ccd 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -41,6 +41,7 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) // Domain represents a storage space. Different domains can use the same database name. @@ -346,13 +347,12 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) { case <-syncer.Done(): // The schema syncer stops, we need stop the schema validator to synchronize the schema version. log.Info("[ddl] reload schema in loop, schema syncer need restart") - do.SchemaValidator.Stop() err := do.mustRestartSyncer() if err != nil { log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err)) break } - do.SchemaValidator.Restart() + log.Info("[ddl] schema syncer restarted.") case <-do.exit: return } @@ -465,6 +465,12 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R DialOptions: []grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), + grpc.WithBackoffMaxDelay(time.Second * 3), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(10) * time.Second, + Timeout: time.Duration(3) * time.Second, + PermitWithoutStream: true, + }), }, TLS: ebd.TLSConfig(), }) From 23f398d9f9fb0b0a86ee667d3a7356eee12359a2 Mon Sep 17 00:00:00 2001 From: winkyao Date: Mon, 8 Oct 2018 10:47:41 +0800 Subject: [PATCH 2/2] fix ci --- ddl/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/syncer.go b/ddl/syncer.go index f357181bfad35..bfaf329f066dd 100644 --- a/ddl/syncer.go +++ b/ddl/syncer.go @@ -144,6 +144,7 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error { if err != nil { return errors.Trace(err) } + s.storeSession(session) s.mu.Lock() s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion) @@ -151,7 +152,6 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error { err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion, clientv3.WithLease(s.loadSession().Lease())) - s.storeSession(session) return errors.Trace(err) }