From 74f245b9847119b59446bae4015ca65a480259ee Mon Sep 17 00:00:00 2001 From: chrisxu333 Date: Sun, 14 Jan 2024 17:29:23 -0700 Subject: [PATCH] persist cluster-enabled status in RocksDB --- kvrocks.conf | 7 ++++-- src/server/server.cc | 25 +++++++++++++++++++ src/server/server.h | 1 + src/storage/storage.h | 2 ++ .../integration/cluster/cluster_test.go | 8 ++++++ tests/gocase/util/server.go | 22 ++++++++++++++++ 6 files changed, 63 insertions(+), 2 deletions(-) diff --git a/kvrocks.conf b/kvrocks.conf index 7d0990738f1..3da4da6a055 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -43,8 +43,11 @@ daemonize no # If you enable cluster, kvrocks will encode key with its slot id calculated by # CRC16 and modulo 16384, encoding key with its slot id makes it efficient to # migrate keys based on the slot. So if you enabled at first time, cluster mode must -# not be disabled after restarting, and vice versa. That is to say, data is not -# compatible between standalone mode with cluster mode, you must migrate data +# not be disabled after restarting, and vice versa. Currently, kvrocks will keep +# using the cluster-enabled status that is persisted at first time, regardless of +# what cluster-enabled status is provided afterwards. +# Note that even if kvrocks has such protection, you should also be aware that data +# is not compatible between standalone mode with cluster mode, you must migrate data # if you want to change mode, otherwise, kvrocks will make data corrupt. # # Default: no diff --git a/src/server/server.cc b/src/server/server.cc index efe721b27ba..65954ed3b00 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -152,6 +152,11 @@ Status Server::Start() { } } + s = forceClusterMode(); + if (!s.IsOK()) { + return s; + } + if (config_->cluster_enabled) { if (config_->persist_cluster_nodes_enabled) { auto s = cluster->LoadClusterNodes(config_->NodesFilePath()); @@ -1839,6 +1844,26 @@ void Server::cleanupExitedWorkerThreads(bool force) { } } +Status Server::forceClusterMode() { + std::string value; + auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName); + rocksdb::Status check_cluster_enabled = + storage->Get(rocksdb::ReadOptions(), cf, rocksdb::Slice(engine::kClusterEnabledKey), &value); + + if (check_cluster_enabled.IsNotFound()) { + Status s = storage->WriteToPropagateCF(engine::kClusterEnabledKey, std::to_string(config_->cluster_enabled)); + if (!s.IsOK()) return s; + } else { + if (check_cluster_enabled.ok()) { + LOG(WARNING) << "cluster_enable status is inconsistent. Using the previously persisted one."; + config_->cluster_enabled = std::stoi(value); + } else { + return {Status::NotOK, "failed to load cluster_enable from storage: " + check_cluster_enabled.ToString()}; + } + } + return Status::OK(); +} + std::string ServerLogData::Encode() const { if (type_ == kReplIdLog) { return std::string(1, kReplIdTag) + " " + content_; diff --git a/src/server/server.h b/src/server/server.h index a86eedf1cd8..2283fc61cb1 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -311,6 +311,7 @@ class Server { void increaseWorkerThreads(size_t delta); void decreaseWorkerThreads(size_t delta); void cleanupExitedWorkerThreads(bool force); + Status forceClusterMode(); std::atomic stop_ = false; std::atomic is_loading_ = false; diff --git a/src/storage/storage.h b/src/storage/storage.h index 7e37d82f7ea..f07b1b4821e 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -80,6 +80,8 @@ constexpr const char *kLuaFuncSHAPrefix = "lua_f_"; constexpr const char *kLuaFuncLibPrefix = "lua_func_lib_"; constexpr const char *kLuaLibCodePrefix = "lua_lib_code_"; +const std::string kClusterEnabledKey = "config_cluster_enabled"; + struct CompressionOption { rocksdb::CompressionType type; const std::string name; diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 8bc42fdafdb..f76c537f745 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -93,6 +93,14 @@ func TestClusterNodes(t *testing.T) { require.EqualValues(t, []redis.ClusterNode{{ID: nodeID, Addr: srv.HostPort()}}, slots[0].Nodes) }) + t.Run("enable/disable cluster-enabled option", func(t *testing.T) { + // force change cluster-enabled status in kvrocks.conf file + srv.ForceChangeClusterMode(false) + defer srv.ForceChangeClusterMode(true) + srv.Restart() + require.NoError(t, rdb.Do(ctx, "clusterx", "version").Err()) + }) + t.Run("enable/disable the persist cluster nodes", func(t *testing.T) { require.NoError(t, rdb.ConfigSet(ctx, "persist-cluster-nodes-enabled", "yes").Err()) srv.Restart() diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go index a3f4314e342..4bd28a896de 100644 --- a/tests/gocase/util/server.go +++ b/tests/gocase/util/server.go @@ -28,6 +28,7 @@ import ( "os/exec" "path/filepath" "regexp" + "strings" "sync" "syscall" "testing" @@ -134,6 +135,27 @@ func (s *KvrocksServer) close(keepDir bool) { s.clean(keepDir) } +func (s *KvrocksServer) ForceChangeClusterMode(enable bool) { + dir := s.configs["dir"] + f, err := os.OpenFile(filepath.Join(dir, "kvrocks.conf"), os.O_RDWR, 0666) + require.NoError(s.t, err) + defer func() { require.NoError(s.t, f.Close()) }() + + // change the line containing cluster-enabled to no + data, err := os.ReadFile(filepath.Join(dir, "kvrocks.conf")) + require.NoError(s.t, err) + + content := string(data) + var new_content string + if !enable { + new_content = strings.ReplaceAll(content, "cluster-enabled yes", "cluster-enabled no") + } else { + new_content = strings.ReplaceAll(content, "cluster-enabled no", "cluster-enabled yes") + } + err = os.WriteFile(filepath.Join(dir, "kvrocks.conf"), []byte(new_content), 0666) + require.NoError(s.t, err) +} + func (s *KvrocksServer) Restart() { s.close(true)