diff --git a/common/constant/key.go b/common/constant/key.go index ff371d08c6..0c0c91f39e 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -100,6 +100,9 @@ const ( const ( CONFIG_NAMESPACE_KEY = "config.namespace" + CONFIG_GROUP_KEY = "config.group" + CONFIG_CLUSTER_KEY = "config.cluster" + CONFIG_CHECK_KEY = "config.check" CONFIG_TIMEOUT_KET = "config.timeout" CONFIG_VERSION_KEY = "configVersion" COMPATIBLE_CONFIG_KEY = "compatible_config" diff --git a/config/base_config.go b/config/base_config.go index 264eeda3cc..6678e7c681 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -47,7 +47,7 @@ type BaseConfig struct { } func (c *BaseConfig) startConfigCenter(ctx context.Context) error { - url, err := common.NewURL(ctx, c.ConfigCenterConfig.Address, common.WithProtocol(c.ConfigCenterConfig.Protocol)) + url, err := common.NewURL(ctx, c.ConfigCenterConfig.Address, common.WithProtocol(c.ConfigCenterConfig.Protocol), common.WithParams(c.ConfigCenterConfig.GetUrlMap())) if err != nil { return err } diff --git a/config/base_config_test.go b/config/base_config_test.go index 6dc3749e55..54def2ae1b 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -29,6 +29,7 @@ import ( "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/apollo" ) func Test_refresh(t *testing.T) { diff --git a/config/config_center_config.go b/config/config_center_config.go index ed43558956..9c100b3497 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -19,6 +19,7 @@ package config import ( "context" + "net/url" "time" ) @@ -26,6 +27,10 @@ import ( "github.com/creasty/defaults" ) +import ( + "github.com/apache/dubbo-go/common/constant" +) + type ConfigCenterConfig struct { context context.Context Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` @@ -35,6 +40,7 @@ type ConfigCenterConfig struct { Username string `yaml:"username" json:"username,omitempty"` Password string `yaml:"password" json:"password,omitempty"` ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"` + Namespace string `default:"dubbo.properties" yaml:"namespace" json:"namespace,omitempty"` AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"` TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"` timeout time.Duration @@ -50,3 +56,11 @@ func (c *ConfigCenterConfig) UnmarshalYAML(unmarshal func(interface{}) error) er } return nil } + +func (c *ConfigCenterConfig) GetUrlMap() url.Values { + urlMap := url.Values{} + urlMap.Set(constant.CONFIG_NAMESPACE_KEY, c.Namespace) + urlMap.Set(constant.CONFIG_GROUP_KEY, c.Group) + urlMap.Set(constant.CONFIG_CLUSTER_KEY, c.Cluster) + return urlMap +} diff --git a/config/testdata/consumer_config.properties b/config/testdata/consumer_config.properties new file mode 100644 index 0000000000..da9fe4f3b3 --- /dev/null +++ b/config/testdata/consumer_config.properties @@ -0,0 +1,52 @@ +filter= +request_timeout=100ms +connect_timeout=100ms +check=true +application.organization=ikurento.com +application.name=BDTService +application.module=dubbogo user-info client +application.version=0.0.1 +application.owner=ZX +application.environment=dev +registries.hangzhouzk.protocol=zookeeper +registries.hangzhouzk.timeout=3s +registries.hangzhouzk.address=127.0.0.1:2181 +registries.hangzhouzk.username= +registries.hangzhouzk.password= +registries.shanghaizk.protocol=zookeeper +registries.shanghaizk.timeout=3s +registries.shanghaizk.address=127.0.0.1:2182 +registries.shanghaizk.username= +registries.shanghaizk.password= +references.UserProvider.registry=hangzhouzk,shanghaizk +references.UserProvider.filter= +references.UserProvider.version=1.0 +references.UserProvider.group=as +references.UserProvider.interface=com.ikurento.user.UserProvider +references.UserProvider.url=dubbo://127.0.0.1:20000/UserProvider +references.UserProvider.cluster=failover +references.UserProvider.methods[0].name=GetUser +references.UserProvider.methods[0].retries=3 +references.UserProvider.params.serviceid=soa.com.ikurento.user.UserProvider +references.UserProvider.params.forks=5 +protocol_conf.dubbo.reconnect_interval=0 +protocol_conf.dubbo.connection_number=2 +protocol_conf.dubbo.heartbeat_period=5s +protocol_conf.dubbo.session_timeout=20s +protocol_conf.dubbo.pool_size=64 +protocol_conf.dubbo.pool_ttl=600 +protocol_conf.dubbo.gr_pool_size=1200 +protocol_conf.dubbo.queue_len=64 +protocol_conf.dubbo.queue_number=60 +protocol_conf.dubbo.getty_session_param.compress_encoding=false +protocol_conf.dubbo.getty_session_param.tcp_no_delay=true +protocol_conf.dubbo.getty_session_param.tcp_keep_alive=true +protocol_conf.dubbo.getty_session_param.keep_alive_period=120s +protocol_conf.dubbo.getty_session_param.tcp_r_buf_size=262144 +protocol_conf.dubbo.getty_session_param.tcp_w_buf_size=65536 +protocol_conf.dubbo.getty_session_param.pkg_wq_size=512 +protocol_conf.dubbo.getty_session_param.tcp_read_timeout=1s +protocol_conf.dubbo.getty_session_param.tcp_write_timeout=5s +protocol_conf.dubbo.getty_session_param.wait_timeout=1s +protocol_conf.dubbo.getty_session_param.max_msg_len=1024 +protocol_conf.dubbo.getty_session_param.session_name=client \ No newline at end of file diff --git a/config/testdata/consumer_config_with_configcenter_apollo.yml b/config/testdata/consumer_config_with_configcenter_apollo.yml new file mode 100644 index 0000000000..49b8fff595 --- /dev/null +++ b/config/testdata/consumer_config_with_configcenter_apollo.yml @@ -0,0 +1,24 @@ +# use apollo config center for fetch config file +# default config file namespace is dubbo.properties +# consumer config file Ref:consumer_config.properties +# provider config file Ref:provider_config.properties +config_center: + protocol: apollo + address: 106.12.25.204:8080 + group: testApplication_yang + cluster: dev + # 'namespace' can be used for router rule , default value is dubbo.properties + # but if you want to change router rule config file ,just open this item +# namespace: governance.properties + # 'config_file' is not necessary ,default : dubbo.properties + # but if you want to change config file ,just open this item +# config_file: mockDubbog.properties + +# application config required +application: + organization: "ikurento.com" + name: "BDTService" + module: "dubbogo user-info server" + version: "0.0.1" + owner: "ZX" + environment: "dev" \ No newline at end of file diff --git a/config/testdata/provider_config.properties b/config/testdata/provider_config.properties new file mode 100644 index 0000000000..f7d70f5cd6 --- /dev/null +++ b/config/testdata/provider_config.properties @@ -0,0 +1,58 @@ +filter= +application.organization=ikurento.com +application.name=BDTService +application.module=dubbogo user-info server +application.version=0.0.1 +application.owner=ZX +application.environment=dev +registries.hangzhouzk.protocol=zookeeper +registries.hangzhouzk.timeout=3s +registries.hangzhouzk.address=127.0.0.1:2181 +registries.hangzhouzk.username= +registries.hangzhouzk.password= +registries.shanghaizk.protocol=zookeeper +registries.shanghaizk.timeout=3s +registries.shanghaizk.address=127.0.0.1:2182 +registries.shanghaizk.username= +registries.shanghaizk.password= +services.UserProvider.registry=hangzhouzk,shanghaizk +services.UserProvider.filter= +services.UserProvider.tps.limiter=default +services.UserProvider.tps.limit.interval=60000 +services.UserProvider.tps.limit.rate=200 +services.UserProvider.tps.limit.strategy=slidingWindow +services.UserProvider.tps.limit.rejected.handler=default +services.UserProvider.execute.limit=200 +services.UserProvider.execute.limit.rejected.handler=default +services.UserProvider.protocol=dubbo +services.UserProvider.interface=com.ikurento.user.UserProvider +services.UserProvider.loadbalance=random +services.UserProvider.version=1.0 +services.UserProvider.group=as +services.UserProvider.warmup=100 +services.UserProvider.cluster=failover +services.UserProvider.methods[0].name=GetUser +services.UserProvider.methods[0].retries=1 +services.UserProvider.methods[0].loadbalance=random +services.UserProvider.methods[0].execute.limit=200 +services.UserProvider.methods[0].execute.limit.rejected.handler=default +protocols.dubbo.name=dubbo +protocols.dubbo.ip=127.0.0.1 +protocols.dubbo.port=20000 +protocol_conf.dubbo.session_number=700 +protocol_conf.dubbo.session_timeout=20s +protocol_conf.dubbo.gr_pool_size=120 +protocol_conf.dubbo.queue_len=64 +protocol_conf.dubbo.queue_number=6 +protocol_conf.dubbo.getty_session_param.compress_encoding=false +protocol_conf.dubbo.getty_session_param.tcp_no_delay=true +protocol_conf.dubbo.getty_session_param.tcp_keep_alive=true +protocol_conf.dubbo.getty_session_param.keep_alive_period=120s +protocol_conf.dubbo.getty_session_param.tcp_r_buf_size=262144 +protocol_conf.dubbo.getty_session_param.tcp_w_buf_size=65536 +protocol_conf.dubbo.getty_session_param.pkg_wq_size=512 +protocol_conf.dubbo.getty_session_param.tcp_read_timeout=1s +protocol_conf.dubbo.getty_session_param.tcp_write_timeout=5s +protocol_conf.dubbo.getty_session_param.wait_timeout=1s +protocol_conf.dubbo.getty_session_param.max_msg_len=1024 +protocol_conf.dubbo.getty_session_param.session_name=server \ No newline at end of file diff --git a/config_center/apollo/factory.go b/config_center/apollo/factory.go new file mode 100644 index 0000000000..47011be4a3 --- /dev/null +++ b/config_center/apollo/factory.go @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apollo + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + . "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func init() { + extension.SetConfigCenterFactory("apollo", createDynamicConfigurationFactory) +} + +func createDynamicConfigurationFactory() DynamicConfigurationFactory { + return &apolloConfigurationFactory{} +} + +type apolloConfigurationFactory struct{} + +func (f *apolloConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) { + dynamicConfiguration, err := newApolloConfiguration(url) + if err != nil { + return nil, err + } + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + return dynamicConfiguration, err + +} diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go new file mode 100644 index 0000000000..4eff318e54 --- /dev/null +++ b/config_center/apollo/impl.go @@ -0,0 +1,176 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apollo + +import ( + "fmt" + "regexp" + "strings" + "sync" +) + +import ( + "github.com/pkg/errors" + "github.com/zouyx/agollo" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + . "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" + "github.com/apache/dubbo-go/remoting" +) + +const ( + apolloProtocolPrefix = "http://" + apolloConfigFormat = "%s.%s" +) + +type apolloConfiguration struct { + url *common.URL + + listeners sync.Map + appConf *agollo.AppConfig + parser parser.ConfigurationParser +} + +func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) { + c := &apolloConfiguration{ + url: url, + } + configAddr := c.getAddressWithProtocolPrefix(url) + configCluster := url.GetParam(constant.CONFIG_CLUSTER_KEY, "") + + appId := url.GetParam(constant.CONFIG_GROUP_KEY, DEFAULT_GROUP) + namespaces := url.GetParam(constant.CONFIG_NAMESPACE_KEY, getProperties(DEFAULT_GROUP)) + c.appConf = &agollo.AppConfig{ + AppId: appId, + Cluster: configCluster, + NamespaceName: namespaces, + Ip: configAddr, + } + + agollo.InitCustomConfig(func() (*agollo.AppConfig, error) { + return c.appConf, nil + }) + + return c, agollo.Start() +} + +func getChangeType(change agollo.ConfigChangeType) remoting.EventType { + switch change { + case agollo.ADDED: + return remoting.EventTypeAdd + case agollo.DELETED: + return remoting.EventTypeDel + default: + return remoting.EventTypeUpdate + } +} + +func (c *apolloConfiguration) AddListener(key string, listener ConfigurationListener, opts ...Option) { + k := &Options{} + for _, opt := range opts { + opt(k) + } + + key = k.Group + key + l, _ := c.listeners.LoadOrStore(key, NewApolloListener()) + l.(*apolloListener).AddListener(listener) +} + +func (c *apolloConfiguration) RemoveListener(key string, listener ConfigurationListener, opts ...Option) { + k := &Options{} + for _, opt := range opts { + opt(k) + } + + key = k.Group + key + l, ok := c.listeners.Load(key) + if ok { + l.(*apolloListener).RemoveListener(listener) + } +} + +func getProperties(namespace string) string { + return getNamespaceName(namespace, agollo.Properties) +} + +func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat) string { + return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat) +} + +func (c *apolloConfiguration) GetConfig(key string, opts ...Option) (string, error) { + k := &Options{} + for _, opt := range opts { + opt(k) + } + /** + * when group is not null, we are getting startup configs(config file) from Config Center, for example: + * key=dubbo.propertie + */ + if len(k.Group) != 0 { + config := agollo.GetConfig(key) + if config == nil { + return "", errors.New(fmt.Sprintf("nothiing in namespace:%s ", key)) + } + return config.GetContent(agollo.Properties), nil + } + + /** + * when group is null, we are fetching governance rules(config item) from Config Center, for example: + * namespace=use default, key =application.organization + */ + config := agollo.GetConfig(c.appConf.NamespaceName) + if config == nil { + return "", errors.New(fmt.Sprintf("nothiing in namespace:%s ", key)) + } + return config.GetStringValue(key, ""), nil +} + +func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string { + address := url.Location + converted := address + if len(address) != 0 { + reg := regexp.MustCompile("\\s+") + address = reg.ReplaceAllString(address, "") + parts := strings.Split(address, ",") + addrs := make([]string, 0) + for _, part := range parts { + addr := part + if !strings.HasPrefix(part, apolloProtocolPrefix) { + addr = apolloProtocolPrefix + part + } + addrs = append(addrs, addr) + } + converted = strings.Join(addrs, ",") + } + return converted +} + +func (c *apolloConfiguration) Parser() parser.ConfigurationParser { + return c.parser +} +func (c *apolloConfiguration) SetParser(p parser.ConfigurationParser) { + c.parser = p +} + +func (c *apolloConfiguration) GetConfigs(key string, opts ...Option) (string, error) { + return c.GetConfig(key, opts...) +} diff --git a/config_center/apollo/impl_test.go b/config_center/apollo/impl_test.go new file mode 100644 index 0000000000..2bb8b0ad69 --- /dev/null +++ b/config_center/apollo/impl_test.go @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apollo + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" + "github.com/apache/dubbo-go/remoting" +) + +const ( + mockAppId = "testApplication_yang" + mockCluster = "dev" + mockNamespace = "mockDubbog.properties" + mockNotifyRes = `[{ + "namespaceName": "mockDubbog.properties", + "notificationId": 53050, + "messages": { + "details": { + "testApplication_yang+default+mockDubbog": 53050 + } + } +}]` + mockServiceConfigRes = `[{ + "appName": "APOLLO-CONFIGSERVICE", + "instanceId": "instance-300408ep:apollo-configservice:8080", + "homepageUrl": "http://localhost:8080" +}]` +) + +var ( + mockConfigRes = `{ + "appId": "testApplication_yang", + "cluster": "default", + "namespaceName": "mockDubbog.properties", + "configurations": { + "registries.hangzhouzk.username": "", + "application.owner": "ZX", + "registries.shanghaizk.username": "", + "protocols.dubbo.ip": "127.0.0.1", + "protocol_conf.dubbo.getty_session_param.tcp_write_timeout": "5s", + "services.UserProvider.cluster": "failover", + "application.module": "dubbogo user-info server", + "services.UserProvider.interface": "com.ikurento.user.UserProvider", + "protocol_conf.dubbo.getty_session_param.compress_encoding": "false", + "registries.shanghaizk.address": "127.0.0.1:2182", + "protocol_conf.dubbo.session_timeout": "20s", + "registries.shanghaizk.timeout": "3s", + "protocol_conf.dubbo.getty_session_param.keep_alive_period": "120s", + "services.UserProvider.warmup": "100", + "application.version": "0.0.1", + "registries.hangzhouzk.protocol": "zookeeper", + "registries.hangzhouzk.password": "", + "protocols.dubbo.name": "dubbo", + "protocol_conf.dubbo.getty_session_param.wait_timeout": "1s", + "protocols.dubbo.port": "20000", + "application_config.owner": "demo", + "application_config.name": "demo", + "application_config.version": "0.0.1", + "application_config.environment": "dev", + "protocol_conf.dubbo.getty_session_param.session_name": "server", + "application.name": "BDTService", + "registries.hangzhouzk.timeout": "3s", + "protocol_conf.dubbo.getty_session_param.tcp_read_timeout": "1s", + "services.UserProvider.loadbalance": "random", + "protocol_conf.dubbo.session_number": "700", + "protocol_conf.dubbo.getty_session_param.max_msg_len": "1024", + "services.UserProvider.registry": "hangzhouzk", + "application_config.module": "demo", + "services.UserProvider.methods[0].name": "GetUser", + "protocol_conf.dubbo.getty_session_param.tcp_no_delay": "true", + "services.UserProvider.methods[0].retries": "1", + "protocol_conf.dubbo.getty_session_param.tcp_w_buf_size": "65536", + "protocol_conf.dubbo.getty_session_param.tcp_r_buf_size": "262144", + "registries.shanghaizk.password": "", + "application_config.organization": "demo", + "registries.shanghaizk.protocol": "zookeeper", + "protocol_conf.dubbo.getty_session_param.tcp_keep_alive": "true", + "registries.hangzhouzk.address": "127.0.0.1:2181", + "application.environment": "dev", + "services.UserProvider.protocol": "dubbo", + "application.organization": "ikurento.com", + "protocol_conf.dubbo.getty_session_param.pkg_wq_size": "512", + "services.UserProvider.methods[0].loadbalance": "random" + }, + "releaseKey": "20191104105242-0f13805d89f834a4" +}` +) + +func initApollo() *httptest.Server { + handlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 1) + handlerMap[mockNamespace] = configResponse + + return runMockConfigServer(handlerMap, notifyResponse) +} + +func configResponse(rw http.ResponseWriter, req *http.Request) { + result := fmt.Sprintf(mockConfigRes) + fmt.Fprintf(rw, "%s", result) +} + +func notifyResponse(rw http.ResponseWriter, req *http.Request) { + result := fmt.Sprintf(mockNotifyRes) + fmt.Fprintf(rw, "%s", result) +} + +func serviceConfigResponse(rw http.ResponseWriter, req *http.Request) { + result := fmt.Sprintf(mockServiceConfigRes) + fmt.Fprintf(rw, "%s", result) +} + +//run mock config server +func runMockConfigServer(handlerMap map[string]func(http.ResponseWriter, *http.Request), + notifyHandler func(http.ResponseWriter, *http.Request)) *httptest.Server { + uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 0) + for namespace, handler := range handlerMap { + uri := fmt.Sprintf("/configs/%s/%s/%s", mockAppId, mockCluster, namespace) + uriHandlerMap[uri] = handler + } + uriHandlerMap["/notifications/v2"] = notifyHandler + uriHandlerMap["/services/config"] = serviceConfigResponse + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + uri := r.RequestURI + for path, handler := range uriHandlerMap { + if strings.HasPrefix(uri, path) { + handler(w, r) + break + } + } + })) + + return ts +} + +func Test_GetConfig(t *testing.T) { + configuration := initMockApollo(t) + configs, err := configuration.GetConfig(mockNamespace, config_center.WithGroup("dubbo")) + assert.NoError(t, err) + configuration.SetParser(&parser.DefaultConfigurationParser{}) + mapContent, err := configuration.Parser().Parse(configs) + assert.NoError(t, err) + assert.Equal(t, "ikurento.com", mapContent["application.organization"]) +} + +func Test_GetConfigItem(t *testing.T) { + configuration := initMockApollo(t) + configs, err := configuration.GetConfig("application.organization") + assert.NoError(t, err) + configuration.SetParser(&parser.DefaultConfigurationParser{}) + assert.NoError(t, err) + assert.Equal(t, "ikurento.com", configs) +} + +func initMockApollo(t *testing.T) *apolloConfiguration { + c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{ + Protocol: "apollo", + Address: "106.12.25.204:8080", + Group: "testApplication_yang", + Cluster: "dev", + Namespace: "mockDubbog.properties", + }} + apollo := initApollo() + apolloUrl := strings.ReplaceAll(apollo.URL, "http", "apollo") + url, err := common.NewURL(context.TODO(), apolloUrl, common.WithParams(c.ConfigCenterConfig.GetUrlMap())) + assert.NoError(t, err) + configuration, err := newApolloConfiguration(&url) + assert.NoError(t, err) + return configuration +} + +func TestAddListener(t *testing.T) { + listener := &apolloDataListener{} + listener.wg.Add(1) + apollo := initMockApollo(t) + mockConfigRes = `{ + "appId": "testApplication_yang", + "cluster": "default", + "namespaceName": "mockDubbog.properties", + "configurations": { + "registries.hangzhouzk.username": "11111" + }, + "releaseKey": "20191104105242-0f13805d89f834a4" +}` + apollo.AddListener(mockNamespace, listener) + listener.wg.Wait() + assert.Equal(t, "registries.hangzhouzk.username", listener.event) + assert.Greater(t, listener.count, 0) +} + +func TestRemoveListener(t *testing.T) { + listener := &apolloDataListener{} + apollo := initMockApollo(t) + mockConfigRes = `{ + "appId": "testApplication_yang", + "cluster": "default", + "namespaceName": "mockDubbog.properties", + "configurations": { + "registries.hangzhouzk.username": "11111" + }, + "releaseKey": "20191104105242-0f13805d89f834a4" +}` + apollo.AddListener(mockNamespace, listener) + apollo.RemoveListener(mockNamespace, listener) + assert.Equal(t, "", listener.event) + listenerCount := 0 + apollo.listeners.Range(func(key, value interface{}) bool { + apolloListener := value.(*apolloListener) + for e := range apolloListener.listeners { + fmt.Println(e) + listenerCount++ + } + return true + }) + assert.Equal(t, listenerCount, 0) + assert.Equal(t, listener.count, 0) +} + +type apolloDataListener struct { + wg sync.WaitGroup + count int + event string +} + +func (l *apolloDataListener) Process(configType *config_center.ConfigChangeEvent) { + if configType.ConfigType != remoting.EventTypeUpdate { + return + } + l.wg.Done() + l.count++ + l.event = configType.Key +} diff --git a/config_center/apollo/listener.go b/config_center/apollo/listener.go new file mode 100644 index 0000000000..d81e1cbf34 --- /dev/null +++ b/config_center/apollo/listener.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apollo + +import ( + "github.com/zouyx/agollo" +) + +import ( + "github.com/apache/dubbo-go/config_center" +) + +type apolloListener struct { + listeners map[config_center.ConfigurationListener]struct{} +} + +func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { + for key, change := range changeEvent.Changes { + for listener := range a.listeners { + listener.Process(&config_center.ConfigChangeEvent{ + ConfigType: getChangeType(change.ChangeType), + Key: key, + Value: change.NewValue, + }) + } + } +} + +func NewApolloListener() *apolloListener { + return &apolloListener{ + listeners: make(map[config_center.ConfigurationListener]struct{}, 0), + } +} + +func (al *apolloListener) AddListener(l config_center.ConfigurationListener) { + if _, ok := al.listeners[l]; !ok { + al.listeners[l] = struct{}{} + agollo.AddChangeListener(al) + } +} + +func (al *apolloListener) RemoveListener(l config_center.ConfigurationListener) { + delete(al.listeners, l) +} diff --git a/config_center/zookeeper/factory.go b/config_center/zookeeper/factory.go index 611f4b9785..3f4690d4e0 100644 --- a/config_center/zookeeper/factory.go +++ b/config_center/zookeeper/factory.go @@ -17,10 +17,6 @@ package zookeeper -import ( - "sync" -) - import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" @@ -35,14 +31,8 @@ func init() { type zookeeperDynamicConfigurationFactory struct { } -var once sync.Once -var dynamicConfiguration *zookeeperDynamicConfiguration - func (f *zookeeperDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { - var err error - once.Do(func() { - dynamicConfiguration, err = newZookeeperDynamicConfiguration(url) - }) + dynamicConfiguration, err := newZookeeperDynamicConfiguration(url) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 4d1f8acbba..6a9128af0c 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8 go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 diff --git a/go.sum b/go.sum index bcde5b1f80..ee7072bd1d 100644 --- a/go.sum +++ b/go.sum @@ -453,6 +453,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8= +github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk= @@ -463,6 +464,8 @@ github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8 h1:k8TV7Gz7cpWpOw/dz71fx8cCZdWoPuckHJ/wkJl+meg= +github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8/go.mod h1:S1cAa98KMFv4Sa8SbJ6ZtvOmf0VlgH0QJ1gXI0lBfBY= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw=