Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: fast new a etcd session when the session is stale in the schemaVersionSyncer #7774 #7810

Merged
merged 3 commits into from
Oct 8, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -87,7 +89,7 @@ type SchemaSyncer interface {
type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
session unsafe.Pointer
mu struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
Expand Down Expand Up @@ -138,7 +140,7 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
return errors.Trace(err)
}
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -148,13 +150,23 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
s.mu.Unlock()

err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))
s.storeSession(session)

return errors.Trace(err)
}

func (s *schemaVersionSyncer) loadSession() *concurrency.Session {
return (*concurrency.Session)(atomic.LoadPointer(&s.session))
}

func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) {
atomic.StorePointer(&s.session, (unsafe.Pointer)(session))
}

// Done implements SchemaSyncer.Done interface.
func (s *schemaVersionSyncer) Done() <-chan struct{} {
return s.session.Done()
return s.loadSession().Done()
}

// Restart implements SchemaSyncer.Restart interface.
Expand All @@ -171,12 +183,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.session = session
s.storeSession(session)

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
defer cancel()
err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

return errors.Trace(err)
}
Expand Down Expand Up @@ -214,7 +226,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
Expand Down
10 changes: 8 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

// Domain represents a storage space. Different domains can use the same database name.
Expand Down Expand Up @@ -351,13 +352,12 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
break
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
case <-do.exit:
return
}
Expand Down Expand Up @@ -471,6 +471,12 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(10) * time.Second,
Timeout: time.Duration(3) * time.Second,
PermitWithoutStream: true,
}),
},
TLS: ebd.TLSConfig(),
})
Expand Down