Skip to content

Commit

Permalink
Fix config disaster (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Oct 15, 2023
1 parent 4ca9b5c commit 6df6b41
Show file tree
Hide file tree
Showing 9 changed files with 1,213 additions and 30 deletions.
38 changes: 38 additions & 0 deletions examples/configuration/fallback/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module github.com/polarismesh/polaris-go-configuration

go 1.17

require github.com/polarismesh/polaris-go v1.4.3

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/polarismesh/specification v1.4.0 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/polarismesh/polaris-go => ../../../
967 changes: 967 additions & 0 deletions examples/configuration/fallback/go.sum

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions examples/configuration/fallback/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Tencent is pleased to support the open source community by making polaris-go available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package main

Check warning on line 18 in examples/configuration/fallback/main.go

View workflow job for this annotation

GitHub Actions / Run Revive Action (1.16.x)

should have a package comment

Check warning on line 18 in examples/configuration/fallback/main.go

View workflow job for this annotation

GitHub Actions / Run Revive Action (1.20.x)

should have a package comment

Check warning on line 18 in examples/configuration/fallback/main.go

View workflow job for this annotation

GitHub Actions / Run Revive Action (1.18.x)

should have a package comment

Check warning on line 18 in examples/configuration/fallback/main.go

View workflow job for this annotation

GitHub Actions / Run Revive Action (1.17.x)

should have a package comment

import (
"fmt"
"log"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
)

func main() {
configAPI, err := polaris.NewConfigAPI()

if err != nil {
fmt.Println("fail to start example.", err)
return
}

// 获取远程的配置文件
namespace := "default"
fileGroup := "polaris-config-example"
fileName := "example.yaml"

configFile, err := configAPI.GetConfigFile(namespace, fileGroup, fileName)
if err != nil {
log.Println("fail to get config.", err)
return
}

// 打印配置文件内容
log.Println(configFile.GetContent())

// 方式一:添加监听器
configFile.AddChangeListener(changeListener)

// 方式二:添加监听器
changeChan := configFile.AddChangeListenerWithChannel()

for {
select {
case event := <-changeChan:
log.Printf("received change event by channel. %+v", event)
}
}
}

func changeListener(event model.ConfigFileChangeEvent) {
log.Printf("received change event. %+v", event)
}
30 changes: 30 additions & 0 deletions examples/configuration/fallback/polaris.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
config:
# 本地缓存配置
localCache:
#描述: 配置文件持久化到本地开关
persistEnable: true
#描述: 配置文件持久化目录,SDK在配置文件变更后,把相关的配置持久化到本地磁盘
persistDir: ./polaris/backup/config
#描述: 配置文件写盘失败的最大重试次数
persistMaxWriteRetry: 1
#描述: 配置文件从磁盘读取失败的最大重试次数
persistMaxReadRetry: 0
#描述: 缓存读写磁盘的重试间隔
persistRetryInterval: 500ms
#描述: 远端获取配置文件失败,兜底降级到本地文件缓存
fallbackToLocalCache: true
configConnector:
addresses:
- 127.0.0.1:8093
configFilter:
enable: true
chain:
- crypto
plugin:
crypto:
entries:
- name: AES
8 changes: 5 additions & 3 deletions pkg/flow/configuration/config_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,21 @@ func (c *ConfigFileFlow) startCheckVersionTask(ctx context.Context) {
continue
}

remoteConfigFile := repo.loadRemoteFile()

// 从服务端获取的配置文件版本号落后于通知的版本号,重新拉取配置
if !(repo.remoteConfigFile == nil || repo.GetNotifiedVersion() > repo.remoteConfigFile.GetVersion()) {
if !(remoteConfigFile == nil || repo.GetNotifiedVersion() > remoteConfigFile.GetVersion()) {
continue
}

if repo.remoteConfigFile == nil {
if remoteConfigFile == nil {
log.GetBaseLogger().Warnf("[Config] client does not pull the configuration, it will be pulled again."+
"file = %+v, notified version = %d",
repo.configFileMetadata, repo.notifiedVersion)
} else {
log.GetBaseLogger().Warnf("[Config] notified version greater than pulled version, will pull config file again. "+
"file = %+v, notified version = %d, pulled version = %d",
repo.configFileMetadata, repo.notifiedVersion, repo.remoteConfigFile.GetVersion())
repo.configFileMetadata, repo.notifiedVersion, remoteConfigFile.GetVersion())
}

if err := repo.pull(); err != nil {
Expand Down
129 changes: 102 additions & 27 deletions pkg/flow/configuration/file_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package configuration
import (
"fmt"
"net/url"
"sync/atomic"
"time"

apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
Expand All @@ -37,19 +38,30 @@ const (
delayMaxTime = 120 // 120s
)

var (
_notExistFile = &configconnector.ConfigFile{
SourceContent: NotExistedFileContent,
NotExist: true,
}
)

// ConfigFileRepo 服务端配置文件代理类,从服务端拉取配置并同步数据
type ConfigFileRepo struct {
connector configconnector.ConfigConnector
chain configfilter.Chain
configuration config.Configuration

configFileMetadata model.ConfigFileMetadata
notifiedVersion uint64 // 长轮询通知的版本号
remoteConfigFile *configconnector.ConfigFile // 从服务端获取的原始配置对象
retryPolicy retryPolicy
listeners []ConfigFileRepoChangeListener
// 长轮询通知的版本号
notifiedVersion uint64
// 从服务端获取的原始配置对象 *configconnector.ConfigFile
remoteConfigFileRef *atomic.Value
retryPolicy retryPolicy
listeners []ConfigFileRepoChangeListener

persistHandler *CachePersistHandler

fallbackToLocalCache bool
}

// ConfigFileRepoChangeListener 远程配置文件发布监听器
Expand All @@ -71,14 +83,16 @@ func newConfigFileRepo(metadata model.ConfigFileMetadata,
delayMinTime: delayMinTime,
delayMaxTime: delayMaxTime,
},
remoteConfigFile: &configconnector.ConfigFile{
Namespace: metadata.GetNamespace(),
FileGroup: metadata.GetFileGroup(),
FileName: metadata.GetFileName(),
Version: initVersion,
},
persistHandler: persistHandler,
remoteConfigFileRef: &atomic.Value{},
persistHandler: persistHandler,
fallbackToLocalCache: configuration.GetConfigFile().GetLocalCache().IsFallbackToLocalCache(),
}
repo.remoteConfigFileRef.Store(&configconnector.ConfigFile{
Namespace: metadata.GetNamespace(),
FileGroup: metadata.GetFileGroup(),
FileName: metadata.GetFileName(),
Version: initVersion,
})
// 1. 同步从服务端拉取配置
if err := repo.pull(); err != nil {
return nil, err
Expand All @@ -90,26 +104,37 @@ func (r *ConfigFileRepo) GetNotifiedVersion() uint64 {
return r.notifiedVersion
}

func (r *ConfigFileRepo) loadRemoteFile() *configconnector.ConfigFile {
val := r.remoteConfigFileRef.Load()
if val == nil {
return nil
}
return val.(*configconnector.ConfigFile)
}

// GetContent 获取配置文件内容
func (r *ConfigFileRepo) GetContent() string {
if r.remoteConfigFile == nil {
remoteFile := r.loadRemoteFile()
if remoteFile == nil {
return NotExistedFileContent
}
return r.remoteConfigFile.GetContent()
return remoteFile.GetContent()
}

func (r *ConfigFileRepo) getVersion() uint64 {
if r.remoteConfigFile == nil {
remoteConfigFile := r.loadRemoteFile()
if remoteConfigFile == nil {
return initVersion
}
return r.remoteConfigFile.GetVersion()
return remoteConfigFile.GetVersion()
}

func (r *ConfigFileRepo) getDataKey() string {
if r.remoteConfigFile == nil {
remoteConfigFile := r.loadRemoteFile()
if remoteConfigFile == nil {
return ""
}
return r.remoteConfigFile.GetDataKey()
return remoteConfigFile.GetDataKey()
}

func (r *ConfigFileRepo) pull() error {
Expand Down Expand Up @@ -154,12 +179,12 @@ func (r *ConfigFileRepo) pull() error {

// 拉取成功
if responseCode == uint32(apimodel.Code_ExecuteSuccess) {
remoteConfigFile := r.loadRemoteFile()
// 本地配置文件落后,更新内存缓存
if r.remoteConfigFile == nil || pulledConfigFile.Version >= r.remoteConfigFile.Version {
r.remoteConfigFile = deepCloneConfigFile(pulledConfigFile)
r.fireChangeEvent(pulledConfigFile.GetContent())
if remoteConfigFile == nil || pulledConfigFile.Version >= remoteConfigFile.Version {
// save into local_cache
r.saveCacheConfigFile(pulledConfigFile)
r.fireChangeEvent(pulledConfigFile)
}
return nil
}
Expand All @@ -173,9 +198,8 @@ func (r *ConfigFileRepo) pull() error {
FileGroup: pullConfigFileReq.FileGroup,
FileName: pullConfigFileReq.FileName,
})
if r.remoteConfigFile != nil {
r.remoteConfigFile = nil
r.fireChangeEvent(NotExistedFileContent)
if remoteConfigFile := r.loadRemoteFile(); remoteConfigFile != nil {
r.fireChangeEvent(_notExistFile)
}
return nil
}
Expand All @@ -188,9 +212,51 @@ func (r *ConfigFileRepo) pull() error {
retryTimes++
r.retryPolicy.delay()
}
r.fallbackIfNecessary(retryTimes, pullConfigFileReq)
return err
}

const (
PullConfigMaxRetryTimes = 3
)

func (r *ConfigFileRepo) fallbackIfNecessary(retryTimes int, req *configconnector.ConfigFile) {
if !(retryTimes >= PullConfigMaxRetryTimes && r.fallbackToLocalCache) {
return
}
cacheVal := &configconnector.ConfigFile{}
fileName := fmt.Sprintf(PatternService, url.QueryEscape(req.Namespace), url.QueryEscape(req.FileGroup),
url.QueryEscape(req.FileName)) + CacheSuffix
if err := r.persistHandler.LoadMessageFromFile(fileName, cacheVal); err != nil {
return
}
log.GetBaseLogger().Errorf("[Config] fallback to local cache success.")

response, err := r.chain.Execute(req, func(configFile *configconnector.ConfigFile) (*configconnector.ConfigFileResponse, error) {
return &configconnector.ConfigFileResponse{
Code: uint32(apimodel.Code_ExecuteSuccess),
ConfigFile: &configconnector.ConfigFile{
Namespace: cacheVal.Namespace,
FileGroup: cacheVal.FileGroup,
FileName: cacheVal.FileName,
SourceContent: cacheVal.SourceContent,
Version: cacheVal.Version,
Md5: cacheVal.Md5,
Encrypted: cacheVal.Encrypted,
Tags: cacheVal.Tags,
},
}, nil
})
if err != nil {
log.GetBaseLogger().Errorf("[Config] fallback to local cache fail. %+v", err)
return
}
localFile := response.ConfigFile
localFile.SetContent(localFile.GetSourceContent())

r.fireChangeEvent(localFile)
}

func (r *ConfigFileRepo) saveCacheConfigFile(file *configconnector.ConfigFile) {
fileName := fmt.Sprintf(PatternService, url.QueryEscape(file.Namespace), url.QueryEscape(file.FileGroup),
url.QueryEscape(file.FileName)) + CacheSuffix
Expand Down Expand Up @@ -221,12 +287,12 @@ func deepCloneConfigFile(sourceConfigFile *configconnector.ConfigFile) *configco
Encrypted: sourceConfigFile.GetEncrypted(),
Tags: tags,
}
ret.SetContent(sourceConfigFile.GetContent())
return ret
}

func (r *ConfigFileRepo) onLongPollingNotified(newVersion uint64) {
if r.remoteConfigFile != nil && r.remoteConfigFile.GetVersion() >= newVersion {
remoteConfigFile := r.loadRemoteFile()
if remoteConfigFile != nil && remoteConfigFile.GetVersion() >= newVersion {
return
}
r.notifiedVersion = newVersion
Expand All @@ -240,9 +306,18 @@ func (r *ConfigFileRepo) AddChangeListener(listener ConfigFileRepoChangeListener
r.listeners = append(r.listeners, listener)
}

func (r *ConfigFileRepo) fireChangeEvent(newContent string) {
func (r *ConfigFileRepo) fireChangeEvent(f *configconnector.ConfigFile) {
if f.GetContent() == "" {
f.SetContent(f.GetSourceContent())
}
if f.NotExist {
r.remoteConfigFileRef.Store(nil)
} else {
r.remoteConfigFileRef.Store(f)
}

for _, listener := range r.listeners {
if err := listener(r.configFileMetadata, newContent); err != nil {
if err := listener(r.configFileMetadata, f.GetContent()); err != nil {
log.GetBaseLogger().Errorf("[Config] invoke config file repo change listener failed.",
zap.Any("file", r.configFileMetadata), zap.Error(err))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/plugin/configconnector/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type ConfigFile struct {

// 实际暴露给应用的配置内容数据
content string
// 该配置文件是否为不存在的场景下的占位信息
NotExist bool
}

func (c *ConfigFile) String() string {
Expand Down
Loading

0 comments on commit 6df6b41

Please sign in to comment.