From 3af8fe337921832d188f6f4d7b5bba05b65cd333 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Sun, 24 Mar 2019 20:07:46 -0700 Subject: [PATCH 01/19] clear store when persist failed --- server/server.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/server/server.go b/server/server.go index 8a32e5ca062..02ff7f6787b 100644 --- a/server/server.go +++ b/server/server.go @@ -536,6 +536,9 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error { old := s.scheduleOpt.load() s.scheduleOpt.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.store(old) + log.Error("schedule config updated failed", + zap.Error(err)) return err } log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -557,6 +560,9 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { old := s.scheduleOpt.rep.load() s.scheduleOpt.rep.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.rep.store(old) + log.Error("replication config updated failed", + zap.Error(err)) return err } log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -568,6 +574,9 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { old := s.scheduleOpt.loadPDServerConfig() s.scheduleOpt.pdServerConfig.Store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.pdServerConfig.Store(old) + log.Error("PDServer config updated failed", + zap.Error(err)) return err } log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -605,6 +614,10 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { old := s.scheduleOpt.ns[name].load() n.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.ns[name].store(old) + log.Error("namespace config updated failed", + zap.String("name", name), + zap.Error(err)) return err } log.Info("namespace config is updated", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -624,6 +637,10 @@ func (s *Server) DeleteNamespaceConfig(name string) error { cfg := n.load() delete(s.scheduleOpt.ns, name) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.ns[name] = newNamespaceOption(cfg) + log.Error("namespace config deleted failed", + zap.String("name", name), + zap.Error(err)) return err } log.Info("namespace config is deleted", zap.String("name", name), zap.Reflect("config", *cfg)) @@ -636,6 +653,12 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) err := s.scheduleOpt.persist(s.kv) if err != nil { + s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) + log.Error("label property config updated failed", + zap.String("typ", typ), + zap.String("labelKey", labelKey), + zap.String("labelValue", labelValue), + zap.Error(err)) return err } log.Info("label property config is updated", zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig())) @@ -647,6 +670,12 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) err := s.scheduleOpt.persist(s.kv) if err != nil { + s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) + log.Error("label property config deleted failed", + zap.String("typ", typ), + zap.String("labelKey", labelKey), + zap.String("labelValue", labelValue), + zap.Error(err)) return err } log.Info("label property config is deleted", zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig())) @@ -664,9 +693,14 @@ func (s *Server) SetClusterVersion(v string) error { if err != nil { return err } + old := s.scheduleOpt.loadClusterVersion() s.scheduleOpt.SetClusterVersion(*version) err = s.scheduleOpt.persist(s.kv) if err != nil { + s.scheduleOpt.SetClusterVersion(old) + log.Error("cluster version set failed", + zap.String("version", v), + zap.Error(err)) return err } log.Info("cluster version is updated", zap.String("new-version", v)) From 7c020230e3d8755497af6f4f79f52578fc8ed254 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Sun, 24 Mar 2019 20:14:17 -0700 Subject: [PATCH 02/19] log more info --- server/server.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/server.go b/server/server.go index 02ff7f6787b..5cdd9699b21 100644 --- a/server/server.go +++ b/server/server.go @@ -538,6 +538,8 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error { if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.store(old) log.Error("schedule config updated failed", + zap.Reflect("new", cfg), + zap.Reflect("old", old), zap.Error(err)) return err } @@ -562,6 +564,8 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.rep.store(old) log.Error("replication config updated failed", + zap.Reflect("new", cfg), + zap.Reflect("old", old), zap.Error(err)) return err } @@ -576,6 +580,8 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.pdServerConfig.Store(old) log.Error("PDServer config updated failed", + zap.Reflect("new", cfg), + zap.Reflect("old", old), zap.Error(err)) return err } @@ -617,6 +623,8 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { s.scheduleOpt.ns[name].store(old) log.Error("namespace config updated failed", zap.String("name", name), + zap.Reflect("new", cfg), + zap.Reflect("old", old), zap.Error(err)) return err } @@ -658,6 +666,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { zap.String("typ", typ), zap.String("labelKey", labelKey), zap.String("labelValue", labelValue), + zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()), zap.Error(err)) return err } @@ -675,6 +684,7 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { zap.String("typ", typ), zap.String("labelKey", labelKey), zap.String("labelValue", labelValue), + zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()), zap.Error(err)) return err } From bc7725116cc1f21080e12cc0b80062144f25401d Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Sun, 24 Mar 2019 20:18:11 -0700 Subject: [PATCH 03/19] gofmt --- server/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 5cdd9699b21..2b477e22fff 100644 --- a/server/server.go +++ b/server/server.go @@ -709,7 +709,8 @@ func (s *Server) SetClusterVersion(v string) error { if err != nil { s.scheduleOpt.SetClusterVersion(old) log.Error("cluster version set failed", - zap.String("version", v), + zap.String("old-version", old.String()), + zap.String("new-version", v), zap.Error(err)) return err } From c677fd2aba0e5734f65696bcb19d471b620e0897 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Sun, 24 Mar 2019 20:19:02 -0700 Subject: [PATCH 04/19] fix error info --- server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 2b477e22fff..b4c9bb47e11 100644 --- a/server/server.go +++ b/server/server.go @@ -708,7 +708,7 @@ func (s *Server) SetClusterVersion(v string) error { err = s.scheduleOpt.persist(s.kv) if err != nil { s.scheduleOpt.SetClusterVersion(old) - log.Error("cluster version set failed", + log.Error("cluster version updated failed", zap.String("old-version", old.String()), zap.String("new-version", v), zap.Error(err)) From a77a9522c00b5254b5d8564fbb0e2b7ead54ce71 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Wed, 27 Mar 2019 05:43:59 -0700 Subject: [PATCH 05/19] add unit_test --- server/cluster_test.go | 89 ++++++++++++++++++++++++++++++++++++++++++ server/server.go | 5 +++ 2 files changed, 94 insertions(+) diff --git a/server/cluster_test.go b/server/cluster_test.go index 5fc65bcebbb..d10b9048ec6 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -16,6 +16,7 @@ package server import ( "context" "fmt" + "github.com/pkg/errors" "strings" "sync" "time" @@ -44,6 +45,14 @@ type testClusterSuite struct { baseCluster } +type testErrorKV struct { + core.KVBase +} + +func (kv *testErrorKV) Save(key, value string) error { + return errors.New("save failed") +} + func mustNewGrpcClient(c *C, addr string) pdpb.PDClient { conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure()) @@ -600,3 +609,83 @@ func (s *testGetStoresSuite) BenchmarkGetStores(c *C) { s.cluster.core.Stores.GetStores() } } + +func (s *testClusterSuite) TestSetScheduleOpt(c *C) { + var err error + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) + c.Assert(err, IsNil) + mustWaitLeader(c, []*Server{s.svr}) + s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) + defer cleanup() + clusterID := s.svr.clusterID + + storeAddr := "127.0.0.1:0" + _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, clusterID, storeAddr)) + c.Assert(err, IsNil) + + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + + scheduleCfg := opt.load() + replicateCfg := s.svr.GetReplicationConfig() + pdServerCfg := s.svr.scheduleOpt.loadPDServerConfig() + + //PUT GET DELETE successed + replicateCfg.MaxReplicas = 5 + scheduleCfg.MaxSnapshotCount = 10 + pdServerCfg.UseRegionStorage = true + typ, labelKey, labelValue := "testTyp", "testKey", "testValue" + nsConfig := NamespaceConfig{LeaderScheduleLimit: uint64(200)} + + c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), IsNil) + c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), IsNil) + c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), IsNil) + c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) + c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) + + c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5)) + c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10)) + c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true) + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey") + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue") + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) + + c.Assert(s.svr.DeleteNamespaceConfig("testNS"), IsNil) + c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), IsNil) + + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0)) + c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) + + //PUT GET failed + oldKV := s.svr.kv + s.svr.kv = core.NewKV(&testErrorKV{}) + replicateCfg.MaxReplicas = 7 + scheduleCfg.MaxSnapshotCount = 20 + pdServerCfg.UseRegionStorage = false + + c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), NotNil) + c.Assert(s.svr.SetReplicationConfig(*replicateCfg), NotNil) + c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), NotNil) + c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), NotNil) + c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), NotNil) + + c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5)) + c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10)) + c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true) + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0)) + c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) + + //DELETE failed + s.svr.kv = oldKV + c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) + c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) + + s.svr.kv = core.NewKV(&testErrorKV{}) + c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil) + c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil) + + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey") + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue") +} diff --git a/server/server.go b/server/server.go index f09ad7fe40d..698fdc49c4d 100644 --- a/server/server.go +++ b/server/server.go @@ -632,6 +632,11 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { } else { s.scheduleOpt.ns[name] = newNamespaceOption(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + delete(s.scheduleOpt.ns, name) + log.Error("namespace config added failed", + zap.String("name", name), + zap.Reflect("new", cfg), + zap.Error(err)) return err } log.Info("namespace config is added", zap.String("name", name), zap.Reflect("new", cfg)) From 3a758c3c481f2ad6cd9342ab6141b5122a46d0fe Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Wed, 27 Mar 2019 06:31:55 -0700 Subject: [PATCH 06/19] try to debug jenkins error --- server/systime_mon.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/systime_mon.go b/server/systime_mon.go index f8664be8aac..30df23ebc75 100644 --- a/server/systime_mon.go +++ b/server/systime_mon.go @@ -28,8 +28,11 @@ func StartMonitor(now func() time.Time, systimeErrHandler func()) { for { last := now().UnixNano() <-tick.C - if now().UnixNano() < last { - log.Error("system time jump backward", zap.Int64("last", last)) + now := now().UnixNano() + if now < last { + log.Error("system time jump backward", + zap.Int64("last", last), + zap.Int64("now", now)) systimeErrHandler() } } From 237f5a5fc3105be6d5e66d5a6cc1648b78d64c99 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Wed, 27 Mar 2019 08:27:50 -0700 Subject: [PATCH 07/19] try to fix jenkins bug --- server/systime_mon_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/systime_mon_test.go b/server/systime_mon_test.go index b0740e09a69..a139e24580f 100644 --- a/server/systime_mon_test.go +++ b/server/systime_mon_test.go @@ -22,11 +22,11 @@ import ( func TestSystimeMonitor(t *testing.T) { var jumpForward int32 - trigged := false + var trigged int32 = 0 go StartMonitor( func() time.Time { - if !trigged { - trigged = true + if atomic.LoadInt32(&trigged) != 1 { + atomic.StoreInt32(&trigged, 1) return time.Now() } From 892d1b7ec7b485b6e6b92beb1be69f875f2799a6 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Wed, 27 Mar 2019 09:02:56 -0700 Subject: [PATCH 08/19] try to fix jenkins error --- tests/server/region_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/region_syncer_test.go b/tests/server/region_syncer_test.go index a5ff7727b90..88882f8ddf5 100644 --- a/tests/server/region_syncer_test.go +++ b/tests/server/region_syncer_test.go @@ -68,7 +68,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { c.Assert(err, IsNil) } // ensure flush to region kv - time.Sleep(3 * time.Second) + time.Sleep(2 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) cluster.WaitLeader() From 19a566b32f8063c14a8b2b2c201ac755c899ddfc Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Wed, 27 Mar 2019 10:05:06 -0700 Subject: [PATCH 09/19] fix data race bug --- server/server.go | 6 +++--- tests/server/region_syncer_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/server.go b/server/server.go index 698fdc49c4d..8c6d396b3d5 100644 --- a/server/server.go +++ b/server/server.go @@ -632,7 +632,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { } else { s.scheduleOpt.ns[name] = newNamespaceOption(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { - delete(s.scheduleOpt.ns, name) + s.scheduleOpt.ns[name].store(&NamespaceConfig{}) log.Error("namespace config added failed", zap.String("name", name), zap.Reflect("new", cfg), @@ -648,9 +648,9 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { func (s *Server) DeleteNamespaceConfig(name string) error { if n, ok := s.scheduleOpt.ns[name]; ok { cfg := n.load() - delete(s.scheduleOpt.ns, name) + s.scheduleOpt.ns[name].store(&NamespaceConfig{}) if err := s.scheduleOpt.persist(s.kv); err != nil { - s.scheduleOpt.ns[name] = newNamespaceOption(cfg) + s.scheduleOpt.ns[name].store(cfg) log.Error("namespace config deleted failed", zap.String("name", name), zap.Error(err)) diff --git a/tests/server/region_syncer_test.go b/tests/server/region_syncer_test.go index 88882f8ddf5..70824d9a3f0 100644 --- a/tests/server/region_syncer_test.go +++ b/tests/server/region_syncer_test.go @@ -67,7 +67,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } - // ensure flush to region kv + // leave more time to ensure flush to region kv time.Sleep(2 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) From b5fbb9db4e2e9285a849b3b334988ddd8e8bd736 Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Wed, 27 Mar 2019 10:15:57 -0700 Subject: [PATCH 10/19] prolong time for region kv to flush --- tests/server/region_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/region_syncer_test.go b/tests/server/region_syncer_test.go index 70824d9a3f0..b0881a68280 100644 --- a/tests/server/region_syncer_test.go +++ b/tests/server/region_syncer_test.go @@ -113,7 +113,7 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { c.Assert(err, IsNil) } // ensure flush to region kv - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) // restart pd1 err = leaderServer.Stop() c.Assert(err, IsNil) From 5548cdc4ea1ff464b02ad06b906f3ff50a66439d Mon Sep 17 00:00:00 2001 From: bradyjoestar Date: Wed, 27 Mar 2019 10:17:59 -0700 Subject: [PATCH 11/19] Update region_syncer_test.go --- tests/server/region_syncer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server/region_syncer_test.go b/tests/server/region_syncer_test.go index b0881a68280..3fb7b6092f5 100644 --- a/tests/server/region_syncer_test.go +++ b/tests/server/region_syncer_test.go @@ -67,8 +67,8 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } - // leave more time to ensure flush to region kv - time.Sleep(2 * time.Second) + // ensure flush to region kv + time.Sleep(5 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) cluster.WaitLeader() From ea2da65c2c8f3a1756a7fb4deae55cb1e480a95f Mon Sep 17 00:00:00 2001 From: "wenbin.shi" Date: Thu, 28 Mar 2019 19:53:21 -0700 Subject: [PATCH 12/19] better output log --- server/server.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/server.go b/server/server.go index 8c6d396b3d5..e3c11791286 100644 --- a/server/server.go +++ b/server/server.go @@ -537,7 +537,7 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error { s.scheduleOpt.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.store(old) - log.Error("schedule config updated failed", + log.Error("failed to update schedule config", zap.Reflect("new", cfg), zap.Reflect("old", old), zap.Error(err)) @@ -563,7 +563,7 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { s.scheduleOpt.rep.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.rep.store(old) - log.Error("replication config updated failed", + log.Error("failed to update replication config", zap.Reflect("new", cfg), zap.Reflect("old", old), zap.Error(err)) @@ -579,7 +579,7 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { s.scheduleOpt.pdServerConfig.Store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.pdServerConfig.Store(old) - log.Error("PDServer config updated failed", + log.Error("failed to update PDServer config", zap.Reflect("new", cfg), zap.Reflect("old", old), zap.Error(err)) @@ -621,7 +621,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { n.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.ns[name].store(old) - log.Error("namespace config updated failed", + log.Error("failed to update namespace config", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old), @@ -633,7 +633,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { s.scheduleOpt.ns[name] = newNamespaceOption(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.ns[name].store(&NamespaceConfig{}) - log.Error("namespace config added failed", + log.Error("failed to add namespace config", zap.String("name", name), zap.Reflect("new", cfg), zap.Error(err)) @@ -651,7 +651,7 @@ func (s *Server) DeleteNamespaceConfig(name string) error { s.scheduleOpt.ns[name].store(&NamespaceConfig{}) if err := s.scheduleOpt.persist(s.kv); err != nil { s.scheduleOpt.ns[name].store(cfg) - log.Error("namespace config deleted failed", + log.Error("failed to delete namespace config", zap.String("name", name), zap.Error(err)) return err @@ -667,7 +667,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { err := s.scheduleOpt.persist(s.kv) if err != nil { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) - log.Error("label property config updated failed", + log.Error("failed to update label property config", zap.String("typ", typ), zap.String("labelKey", labelKey), zap.String("labelValue", labelValue), @@ -685,7 +685,7 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { err := s.scheduleOpt.persist(s.kv) if err != nil { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) - log.Error("label property config deleted failed", + log.Error("failed to delete label property config", zap.String("typ", typ), zap.String("labelKey", labelKey), zap.String("labelValue", labelValue), @@ -713,7 +713,7 @@ func (s *Server) SetClusterVersion(v string) error { err = s.scheduleOpt.persist(s.kv) if err != nil { s.scheduleOpt.SetClusterVersion(old) - log.Error("cluster version updated failed", + log.Error("failed to update cluster version", zap.String("old-version", old.String()), zap.String("new-version", v), zap.Error(err)) From 300ac33cc084f2246f66ed51dcc96346058830ec Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Thu, 11 Apr 2019 04:52:53 -0400 Subject: [PATCH 13/19] convert 5s to 3s --- tests/server/region_syncer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server/region_syncer_test.go b/tests/server/region_syncer_test.go index 3fb7b6092f5..a5ff7727b90 100644 --- a/tests/server/region_syncer_test.go +++ b/tests/server/region_syncer_test.go @@ -68,7 +68,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { c.Assert(err, IsNil) } // ensure flush to region kv - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) cluster.WaitLeader() @@ -113,7 +113,7 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { c.Assert(err, IsNil) } // ensure flush to region kv - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) // restart pd1 err = leaderServer.Stop() c.Assert(err, IsNil) From 28689dc880b9f8ec56bb0a6375cf1674d2b2d122 Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Wed, 17 Apr 2019 06:48:01 -0400 Subject: [PATCH 14/19] replace sync.map --- server/option.go | 62 +++++++++++++++++++++++++++++++++++++----------- server/server.go | 34 +++++++++++++++++--------- 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/server/option.go b/server/option.go index e51626cc102..748412d003a 100644 --- a/server/option.go +++ b/server/option.go @@ -15,6 +15,7 @@ package server import ( "reflect" + "sync" "sync/atomic" "time" @@ -28,7 +29,7 @@ import ( type scheduleOption struct { v atomic.Value rep *Replication - ns map[string]*namespaceOption + ns sync.Map // concurrent map[string]*namespaceOption labelProperty atomic.Value clusterVersion atomic.Value pdServerConfig atomic.Value @@ -37,10 +38,10 @@ type scheduleOption struct { func newScheduleOption(cfg *Config) *scheduleOption { o := &scheduleOption{} o.store(&cfg.Schedule) - o.ns = make(map[string]*namespaceOption) + o.ns = sync.Map{} for name, nsCfg := range cfg.Namespace { nsCfg := nsCfg - o.ns[name] = newNamespaceOption(&nsCfg) + o.ns.Store(name, newNamespaceOption(&nsCfg)) } o.rep = newReplication(&cfg.Replication) o.pdServerConfig.Store(&cfg.PDServerCfg) @@ -61,8 +62,17 @@ func (o *scheduleOption) GetReplication() *Replication { return o.rep } +func (o *scheduleOption) getNS(name string) (*namespaceOption, bool) { + if n, ok := o.ns.Load(name); ok { + if n, ok := n.(*namespaceOption); ok { + return n, true + } + } + return nil, false +} + func (o *scheduleOption) GetMaxReplicas(name string) int { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetMaxReplicas() } return o.rep.GetMaxReplicas() @@ -105,35 +115,35 @@ func (o *scheduleOption) GetMaxStoreDownTime() time.Duration { } func (o *scheduleOption) GetLeaderScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetLeaderScheduleLimit() } return o.load().LeaderScheduleLimit } func (o *scheduleOption) GetRegionScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetRegionScheduleLimit() } return o.load().RegionScheduleLimit } func (o *scheduleOption) GetReplicaScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetReplicaScheduleLimit() } return o.load().ReplicaScheduleLimit } func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetMergeScheduleLimit() } return o.load().MergeScheduleLimit } func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetHotRegionScheduleLimit() } return o.load().HotRegionScheduleLimit @@ -273,9 +283,21 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig { func (o *scheduleOption) persist(kv *core.KV) error { namespaces := make(map[string]NamespaceConfig) - for name, ns := range o.ns { - namespaces[name] = *ns.load() + + f := func(k, v interface{}) bool { + var kstr string + var ok bool + if kstr, ok = k.(string); !ok { + return false + } + if ns, ok := v.(*namespaceOption); ok { + namespaces[kstr] = *ns.load() + return true + } + return false } + o.ns.Range(f) + cfg := &Config{ Schedule: *o.load(), Replication: *o.rep.load(), @@ -290,9 +312,21 @@ func (o *scheduleOption) persist(kv *core.KV) error { func (o *scheduleOption) reload(kv *core.KV) error { namespaces := make(map[string]NamespaceConfig) - for name, ns := range o.ns { - namespaces[name] = *ns.load() + + f := func(k, v interface{}) bool { + var kstr string + var ok bool + if kstr, ok = k.(string); !ok { + return false + } + if ns, ok := v.(*namespaceOption); ok { + namespaces[kstr] = *ns.load() + return true + } + return false } + o.ns.Range(f) + cfg := &Config{ Schedule: *o.load().clone(), Replication: *o.rep.load(), @@ -311,7 +345,7 @@ func (o *scheduleOption) reload(kv *core.KV) error { o.rep.store(&cfg.Replication) for name, nsCfg := range cfg.Namespace { nsCfg := nsCfg - o.ns[name] = newNamespaceOption(&nsCfg) + o.ns.Store(name, newNamespaceOption(&nsCfg)) } o.labelProperty.Store(cfg.LabelProperty) o.clusterVersion.Store(cfg.ClusterVersion) diff --git a/server/server.go b/server/server.go index e3c11791286..124807197cb 100644 --- a/server/server.go +++ b/server/server.go @@ -511,9 +511,21 @@ func (s *Server) GetConfig() *Config { cfg.Schedule = *s.scheduleOpt.load() cfg.Replication = *s.scheduleOpt.rep.load() namespaces := make(map[string]NamespaceConfig) - for name, opt := range s.scheduleOpt.ns { - namespaces[name] = *opt.load() + + f := func(k, v interface{}) bool { + var kstr string + var ok bool + if kstr, ok = k.(string); !ok { + return false + } + if ns, ok := v.(*namespaceOption); ok { + namespaces[kstr] = *ns.load() + return true + } + return false } + s.scheduleOpt.ns.Range(f) + cfg.Namespace = namespaces cfg.LabelProperty = s.scheduleOpt.loadLabelPropertyConfig().clone() cfg.ClusterVersion = s.scheduleOpt.loadClusterVersion() @@ -591,7 +603,7 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { // GetNamespaceConfig get the namespace config. func (s *Server) GetNamespaceConfig(name string) *NamespaceConfig { - if _, ok := s.scheduleOpt.ns[name]; !ok { + if _, ok := s.scheduleOpt.getNS(name); !ok { return &NamespaceConfig{} } @@ -616,11 +628,11 @@ func (s *Server) GetNamespaceConfigWithAdjust(name string) *NamespaceConfig { // SetNamespaceConfig sets the namespace config. func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { - if n, ok := s.scheduleOpt.ns[name]; ok { - old := s.scheduleOpt.ns[name].load() + if n, ok := s.scheduleOpt.getNS(name); ok { + old := n.load() n.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { - s.scheduleOpt.ns[name].store(old) + s.scheduleOpt.ns.Store(name, old) log.Error("failed to update namespace config", zap.String("name", name), zap.Reflect("new", cfg), @@ -630,9 +642,9 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { } log.Info("namespace config is updated", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old)) } else { - s.scheduleOpt.ns[name] = newNamespaceOption(&cfg) + s.scheduleOpt.ns.Store(name, newNamespaceOption(&cfg)) if err := s.scheduleOpt.persist(s.kv); err != nil { - s.scheduleOpt.ns[name].store(&NamespaceConfig{}) + s.scheduleOpt.ns.Delete(name) log.Error("failed to add namespace config", zap.String("name", name), zap.Reflect("new", cfg), @@ -646,11 +658,11 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { // DeleteNamespaceConfig deletes the namespace config. func (s *Server) DeleteNamespaceConfig(name string) error { - if n, ok := s.scheduleOpt.ns[name]; ok { + if n, ok := s.scheduleOpt.getNS(name); ok { cfg := n.load() - s.scheduleOpt.ns[name].store(&NamespaceConfig{}) + s.scheduleOpt.ns.Delete(name) if err := s.scheduleOpt.persist(s.kv); err != nil { - s.scheduleOpt.ns[name].store(cfg) + s.scheduleOpt.ns.Store(name, cfg) log.Error("failed to delete namespace config", zap.String("name", name), zap.Error(err)) From bd2754c7d7c34f70cb92f648b46f953c925f389c Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Wed, 17 Apr 2019 07:33:28 -0400 Subject: [PATCH 15/19] fix ci bug --- server/cluster_test.go | 1 + server/server.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/cluster_test.go b/server/cluster_test.go index d10b9048ec6..73c8caf3251 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -679,6 +679,7 @@ func (s *testClusterSuite) TestSetScheduleOpt(c *C) { //DELETE failed s.svr.kv = oldKV c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) s.svr.kv = core.NewKV(&testErrorKV{}) diff --git a/server/server.go b/server/server.go index 124807197cb..6c8be742bf7 100644 --- a/server/server.go +++ b/server/server.go @@ -632,7 +632,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { old := n.load() n.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { - s.scheduleOpt.ns.Store(name, old) + s.scheduleOpt.ns.Store(name, newNamespaceOption(old)) log.Error("failed to update namespace config", zap.String("name", name), zap.Reflect("new", cfg), @@ -662,7 +662,7 @@ func (s *Server) DeleteNamespaceConfig(name string) error { cfg := n.load() s.scheduleOpt.ns.Delete(name) if err := s.scheduleOpt.persist(s.kv); err != nil { - s.scheduleOpt.ns.Store(name, cfg) + s.scheduleOpt.ns.Store(name, newNamespaceOption(cfg)) log.Error("failed to delete namespace config", zap.String("name", name), zap.Error(err)) From 2f777ccf08d80a15476fa702c289e3e5da0b907e Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Wed, 17 Apr 2019 07:42:42 -0400 Subject: [PATCH 16/19] rebuild jenkins --- server/cluster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/cluster_test.go b/server/cluster_test.go index 73c8caf3251..51dfd12dd48 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -679,11 +679,11 @@ func (s *testClusterSuite) TestSetScheduleOpt(c *C) { //DELETE failed s.svr.kv = oldKV c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) - c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) s.svr.kv = core.NewKV(&testErrorKV{}) c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil) + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil) c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) From 4b64562d4fd2a636547df698db3d4c07f0cf96ef Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Tue, 21 May 2019 04:43:28 -0400 Subject: [PATCH 17/19] extract function --- server/cluster_test.go | 2 +- server/option.go | 51 +++++++++++++++++------------------------- server/server.go | 16 +------------ 3 files changed, 23 insertions(+), 46 deletions(-) diff --git a/server/cluster_test.go b/server/cluster_test.go index 51dfd12dd48..a7f40e7e131 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -16,7 +16,6 @@ package server import ( "context" "fmt" - "github.com/pkg/errors" "strings" "sync" "time" @@ -26,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" + "github.com/pkg/errors" "google.golang.org/grpc" ) diff --git a/server/option.go b/server/option.go index 748412d003a..f7f137246e6 100644 --- a/server/option.go +++ b/server/option.go @@ -71,6 +71,25 @@ func (o *scheduleOption) getNS(name string) (*namespaceOption, bool) { return nil, false } +func (o *scheduleOption) loadNSConfig() map[string]NamespaceConfig { + namespaces := make(map[string]NamespaceConfig) + f := func(k, v interface{}) bool { + var kstr string + var ok bool + if kstr, ok = k.(string); !ok { + return false + } + if ns, ok := v.(*namespaceOption); ok { + namespaces[kstr] = *ns.load() + return true + } + return false + } + o.ns.Range(f) + + return namespaces +} + func (o *scheduleOption) GetMaxReplicas(name string) int { if n, ok := o.getNS(name); ok { return n.GetMaxReplicas() @@ -282,21 +301,7 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig { } func (o *scheduleOption) persist(kv *core.KV) error { - namespaces := make(map[string]NamespaceConfig) - - f := func(k, v interface{}) bool { - var kstr string - var ok bool - if kstr, ok = k.(string); !ok { - return false - } - if ns, ok := v.(*namespaceOption); ok { - namespaces[kstr] = *ns.load() - return true - } - return false - } - o.ns.Range(f) + namespaces := o.loadNSConfig() cfg := &Config{ Schedule: *o.load(), @@ -311,21 +316,7 @@ func (o *scheduleOption) persist(kv *core.KV) error { } func (o *scheduleOption) reload(kv *core.KV) error { - namespaces := make(map[string]NamespaceConfig) - - f := func(k, v interface{}) bool { - var kstr string - var ok bool - if kstr, ok = k.(string); !ok { - return false - } - if ns, ok := v.(*namespaceOption); ok { - namespaces[kstr] = *ns.load() - return true - } - return false - } - o.ns.Range(f) + namespaces := o.loadNSConfig() cfg := &Config{ Schedule: *o.load().clone(), diff --git a/server/server.go b/server/server.go index 3e33096dcc5..a0ad8cef248 100644 --- a/server/server.go +++ b/server/server.go @@ -510,21 +510,7 @@ func (s *Server) GetConfig() *Config { cfg := s.cfg.clone() cfg.Schedule = *s.scheduleOpt.load() cfg.Replication = *s.scheduleOpt.rep.load() - namespaces := make(map[string]NamespaceConfig) - - f := func(k, v interface{}) bool { - var kstr string - var ok bool - if kstr, ok = k.(string); !ok { - return false - } - if ns, ok := v.(*namespaceOption); ok { - namespaces[kstr] = *ns.load() - return true - } - return false - } - s.scheduleOpt.ns.Range(f) + namespaces:= s.scheduleOpt.loadNSConfig() cfg.Namespace = namespaces cfg.LabelProperty = s.scheduleOpt.loadLabelPropertyConfig().clone() From 0ec54241ad570767d8a6cc4986f58a102af6f47d Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Tue, 21 May 2019 04:45:26 -0400 Subject: [PATCH 18/19] go fmt --- server/option.go | 2 +- server/server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/option.go b/server/option.go index f7f137246e6..9e301db710d 100644 --- a/server/option.go +++ b/server/option.go @@ -86,7 +86,7 @@ func (o *scheduleOption) loadNSConfig() map[string]NamespaceConfig { return false } o.ns.Range(f) - + return namespaces } diff --git a/server/server.go b/server/server.go index a0ad8cef248..f538a17f1de 100644 --- a/server/server.go +++ b/server/server.go @@ -510,7 +510,7 @@ func (s *Server) GetConfig() *Config { cfg := s.cfg.clone() cfg.Schedule = *s.scheduleOpt.load() cfg.Replication = *s.scheduleOpt.rep.load() - namespaces:= s.scheduleOpt.loadNSConfig() + namespaces := s.scheduleOpt.loadNSConfig() cfg.Namespace = namespaces cfg.LabelProperty = s.scheduleOpt.loadLabelPropertyConfig().clone() From 4db8982c23056744d9574fd5202c1fc3a5d7a5eb Mon Sep 17 00:00:00 2001 From: "webb.shi" Date: Tue, 21 May 2019 04:53:34 -0400 Subject: [PATCH 19/19] revert system_mon --- server/systime_mon.go | 7 ++----- server/systime_mon_test.go | 6 +++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/systime_mon.go b/server/systime_mon.go index 30df23ebc75..f8664be8aac 100644 --- a/server/systime_mon.go +++ b/server/systime_mon.go @@ -28,11 +28,8 @@ func StartMonitor(now func() time.Time, systimeErrHandler func()) { for { last := now().UnixNano() <-tick.C - now := now().UnixNano() - if now < last { - log.Error("system time jump backward", - zap.Int64("last", last), - zap.Int64("now", now)) + if now().UnixNano() < last { + log.Error("system time jump backward", zap.Int64("last", last)) systimeErrHandler() } } diff --git a/server/systime_mon_test.go b/server/systime_mon_test.go index a139e24580f..b0740e09a69 100644 --- a/server/systime_mon_test.go +++ b/server/systime_mon_test.go @@ -22,11 +22,11 @@ import ( func TestSystimeMonitor(t *testing.T) { var jumpForward int32 - var trigged int32 = 0 + trigged := false go StartMonitor( func() time.Time { - if atomic.LoadInt32(&trigged) != 1 { - atomic.StoreInt32(&trigged, 1) + if !trigged { + trigged = true return time.Now() }