diff --git a/README.md b/README.md index 5c4de0559d..9bade617c8 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Apache License, Version 2.0 ## Release note ## -[v1.4.0-rc1 - Mar 12, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1) +[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0) [v1.3.0 - Mar 1, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.3.0) diff --git a/README_CN.md b/README_CN.md index 5dec68fd61..180759f366 100644 --- a/README_CN.md +++ b/README_CN.md @@ -15,7 +15,7 @@ Apache License, Version 2.0 ## 发布日志 ## -[v1.4.0-rc1 - 2020年3月12日](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1) +[v1.4.0 - 2020年3月17日](https://github.com/apache/dubbo-go/releases/tag/v1.4.0) [v1.3.0 - 2020年3月1日](https://github.com/apache/dubbo-go/releases/tag/v1.3.0) diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go new file mode 100644 index 0000000000..6b92189c4e --- /dev/null +++ b/common/extension/registry_directory.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +type registryDirectory func(url *common.URL, registry registry.Registry) (cluster.Directory, error) + +var defaultRegistry registryDirectory + +// SetDefaultRegistryDirectory ... +func SetDefaultRegistryDirectory(v registryDirectory) { + defaultRegistry = v +} + +// GetDefaultRegistryDirectory ... +func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) { + if defaultRegistry == nil { + panic("registry directory is not existing, make sure you have import the package.") + } + return defaultRegistry(config, registry) +} diff --git a/common/extension/service_instance_selector_factory.go b/common/extension/service_instance_selector_factory.go index 1dd1ac6c99..d767e0a7dc 100644 --- a/common/extension/service_instance_selector_factory.go +++ b/common/extension/service_instance_selector_factory.go @@ -18,7 +18,7 @@ package extension import ( - "github.com/apache/dubbo-go/registry/service/instance" + "github.com/apache/dubbo-go/registry/servicediscovery/instance" perrors "github.com/pkg/errors" ) diff --git a/common/observer/dispatcher/mock_event_dispatcher.go b/common/observer/dispatcher/mock_event_dispatcher.go new file mode 100644 index 0000000000..45cdaa71a2 --- /dev/null +++ b/common/observer/dispatcher/mock_event_dispatcher.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dispatcher + +import ( + "github.com/apache/dubbo-go/common/observer" +) + +// MockEventDispatcher will do nothing. +// It is only used by tests +// Now the implementation doing nothing, +// But you can modify this if needed +type MockEventDispatcher struct { +} + +// AddEventListener do nothing +func (m MockEventDispatcher) AddEventListener(listener observer.EventListener) { +} + +// AddEventListeners do nothing +func (m MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { +} + +// RemoveEventListener do nothing +func (m MockEventDispatcher) RemoveEventListener(listener observer.EventListener) { +} + +// RemoveEventListeners do nothing +func (m MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { +} + +// GetAllEventListeners return empty list +func (m MockEventDispatcher) GetAllEventListeners() []observer.EventListener { + return make([]observer.EventListener, 0) +} + +// RemoveAllEventListeners do nothing +func (m MockEventDispatcher) RemoveAllEventListeners() { +} + +// Dispatch do nothing +func (m MockEventDispatcher) Dispatch(event observer.Event) { +} diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 68ba3ff788..f98a44873a 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -44,7 +44,7 @@ var ( typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type() ) -// NewProxy ... +// NewProxy create service proxy. func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy { return &Proxy{ invoke: invoke, @@ -59,7 +59,6 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str // type XxxProvider struct { // Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error // } - func (p *Proxy) Implement(v common.RPCService) { // check parameters, incoming interface must be a elem's pointer. @@ -202,12 +201,12 @@ func (p *Proxy) Implement(v common.RPCService) { } -// Get ... +// Get get rpc service instance. func (p *Proxy) Get() common.RPCService { return p.rpc } -// GetCallback ... +// GetCallback get callback. func (p *Proxy) GetCallback() interface{} { return p.callBack } diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 7b249a3e97..34fa3fd07e 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -22,7 +22,7 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// ProxyFactory ... +// ProxyFactory interface. type ProxyFactory interface { GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy diff --git a/common/rpc_service.go b/common/rpc_service.go index 3a25d32239..90ebdaa6dc 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -59,7 +59,6 @@ type AsyncCallback func(response CallbackResponse) // return map[string][string]{} // } const ( - // METHOD_MAPPER ... METHOD_MAPPER = "MethodMapper" ) @@ -68,7 +67,7 @@ var ( // because Typeof takes an empty interface value. This is annoying. typeOfError = reflect.TypeOf((*error)(nil)).Elem() - // ServiceMap ... + // ServiceMap store description of service. // todo: lowerecas? ServiceMap = &serviceMap{ serviceMap: make(map[string]map[string]*Service), @@ -80,7 +79,7 @@ var ( // info of method ////////////////////////// -// MethodType ... +// MethodType is description of service method. type MethodType struct { method reflect.Method ctxType reflect.Type // request context @@ -88,27 +87,27 @@ type MethodType struct { replyType reflect.Type // return value, otherwise it is nil } -// Method ... +// Method get @m.method. func (m *MethodType) Method() reflect.Method { return m.method } -// CtxType ... +// CtxType get @m.ctxType. func (m *MethodType) CtxType() reflect.Type { return m.ctxType } -// ArgsType ... +// ArgsType get @m.argsType. func (m *MethodType) ArgsType() []reflect.Type { return m.argsType } -// ReplyType ... +// ReplyType get @m.replyType. func (m *MethodType) ReplyType() reflect.Type { return m.replyType } -// SuiteContext ... +// SuiteContext tranfer @ctx to reflect.Value type or get it from @m.ctxType. func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value { if contextv := reflect.ValueOf(ctx); contextv.IsValid() { return contextv @@ -120,7 +119,7 @@ func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value { // info of service interface ////////////////////////// -// Service ... +// Service is description of service type Service struct { name string rcvr reflect.Value @@ -128,7 +127,7 @@ type Service struct { methods map[string]*MethodType } -// Method ... +// Method get @s.methods. func (s *Service) Method() map[string]*MethodType { return s.methods } @@ -138,12 +137,12 @@ func (s *Service) Name() string { return s.name } -// RcvrType ... +// RcvrType get @s.rcvrType. func (s *Service) RcvrType() reflect.Type { return s.rcvrType } -// Rcvr ... +// Rcvr get @s.rcvr. func (s *Service) Rcvr() reflect.Value { return s.rcvr } diff --git a/common/url.go b/common/url.go index ebb648db27..a70ac7dc9d 100644 --- a/common/url.go +++ b/common/url.go @@ -195,7 +195,6 @@ func NewURLWithOptions(opts ...option) *URL { // NewURL will create a new url // the urlString should not be empty func NewURL(urlString string, opts ...option) (URL, error) { - var ( err error rawUrlString string @@ -249,7 +248,7 @@ func NewURL(urlString string, opts ...option) (URL, error) { return s, nil } -// URLEqual ... +// URLEqual judge @url and @c is equal or not. func (c URL) URLEqual(url URL) bool { c.Ip = "" c.Port = "" @@ -265,17 +264,19 @@ func (c URL) URLEqual(url URL) bool { } else if urlGroup == constant.ANY_VALUE { urlKey = strings.Replace(urlKey, "group=*", "group="+cGroup, 1) } + + // 1. protocol, username, password, ip, port, service name, group, version should be equal if cKey != urlKey { return false } + + // 2. if url contains enabled key, should be true, or * if url.GetParam(constant.ENABLED_KEY, "true") != "true" && url.GetParam(constant.ENABLED_KEY, "") != constant.ANY_VALUE { return false } + //TODO :may need add interface key any value condition - if !isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) { - return false - } - return true + return isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) } func isMatchCategory(category1 string, category2 string) bool { @@ -313,10 +314,9 @@ func (c URL) Key() string { "%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s", c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, "")) return buildString - //return c.ServiceKey() } -// ServiceKey ... +// ServiceKey get a unique key of a service. func (c URL) ServiceKey() string { intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) if intf == "" { @@ -409,12 +409,12 @@ func (c *URL) RangeParams(f func(key, value string) bool) { // GetParam ... func (c URL) GetParam(s string, d string) string { - var r string c.paramsLock.RLock() - if r = c.params.Get(s); len(r) == 0 { + defer c.paramsLock.RUnlock() + r := c.params.Get(s) + if len(r) == 0 { r = d } - c.paramsLock.RUnlock() return r } @@ -454,10 +454,8 @@ func (c URL) GetRawParam(key string) string { // GetParamBool ... func (c URL) GetParamBool(s string, d bool) bool { - - var r bool - var err error - if r, err = strconv.ParseBool(c.GetParam(s, "")); err != nil { + r, err := strconv.ParseBool(c.GetParam(s, "")) + if err != nil { return d } return r @@ -465,10 +463,8 @@ func (c URL) GetParamBool(s string, d bool) bool { // GetParamInt ... func (c URL) GetParamInt(s string, d int64) int64 { - var r int - var err error - - if r, err = strconv.Atoi(c.GetParam(s, "")); r == 0 || err != nil { + r, err := strconv.Atoi(c.GetParam(s, "")) + if r == 0 || err != nil { return d } return int64(r) @@ -476,11 +472,8 @@ func (c URL) GetParamInt(s string, d int64) int64 { // GetMethodParamInt ... func (c URL) GetMethodParamInt(method string, key string, d int64) int64 { - var r int - var err error - c.paramsLock.RLock() - defer c.paramsLock.RUnlock() - if r, err = strconv.Atoi(c.GetParam("methods."+method+"."+key, "")); r == 0 || err != nil { + r, err := strconv.Atoi(c.GetParam("methods."+method+"."+key, "")) + if r == 0 || err != nil { return d } return int64(r) @@ -492,14 +485,13 @@ func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 { if r == math.MinInt64 { return c.GetParamInt(key, d) } - return r } // GetMethodParam ... func (c URL) GetMethodParam(method string, key string, d string) string { - var r string - if r = c.GetParam("methods."+method+"."+key, ""); r == "" { + r := c.GetParam("methods."+method+"."+key, "") + if r == "" { r = d } return r @@ -530,7 +522,6 @@ func (c *URL) SetParams(m url.Values) { // ToMap transfer URL to Map func (c URL) ToMap() map[string]string { - paramsMap := make(map[string]string) c.RangeParams(func(key, value string) bool { @@ -615,8 +606,30 @@ func (c *URL) Clone() *URL { return newUrl } +// Copy url based on the reserved parameters' keys. +func (c *URL) CloneWithParams(reserveParams []string) *URL { + params := url.Values{} + for _, reserveParam := range reserveParams { + v := c.GetParam(reserveParam, "") + if len(v) != 0 { + params.Set(reserveParam, v) + } + } + + return NewURLWithOptions( + WithProtocol(c.Protocol), + WithUsername(c.Username), + WithPassword(c.Password), + WithIp(c.Ip), + WithPort(c.Port), + WithPath(c.Path), + WithMethods(c.Methods), + WithParams(params), + ) +} + func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) { - var methodConfigMergeFcn = []func(method string){} + methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys)) for _, paramKey := range paramKeys { if v := referenceUrl.GetParam(paramKey, ""); len(v) > 0 { mergedUrl.SetParam(paramKey, v) diff --git a/config/config_loader.go b/config/config_loader.go index 6666409dd3..121ef39ff2 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -82,140 +82,146 @@ func checkApplicationName(config *ApplicationConfig) { } } -// Load Dubbo Init -func Load() { - - // init router - if confRouterFile != "" { - if errPro := RouterInit(confRouterFile); errPro != nil { - log.Printf("[routerConfig init] %#v", errPro) - } - } - - var eventDispatcherType string - if consumerConfig != nil { - eventDispatcherType = consumerConfig.eventDispatcherType - } - // notice consumerConfig.eventDispatcherType will be replaced - if providerConfig != nil { - eventDispatcherType = providerConfig.eventDispatcherType - } - // init EventDispatcher should before everything - extension.SetAndInitGlobalDispatcher(eventDispatcherType) - - // reference config +func loadConsumerConfig() { if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") - } else { - // init other consumer config - conConfigType := consumerConfig.ConfigType - for key, value := range extension.GetDefaultConfigReader() { - if conConfigType == nil { - if v, ok := conConfigType[key]; ok { - value = v - } - } - if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil { - logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value) + return + } + // init other consumer config + conConfigType := consumerConfig.ConfigType + for key, value := range extension.GetDefaultConfigReader() { + if conConfigType == nil { + if v, ok := conConfigType[key]; ok { + value = v } } + if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil { + logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value) + } + } + + metricConfig = consumerConfig.MetricConfig + applicationConfig = consumerConfig.ApplicationConfig - metricConfig = consumerConfig.MetricConfig - applicationConfig = consumerConfig.ApplicationConfig + extension.SetAndInitGlobalDispatcher(consumerConfig.eventDispatcherType) - checkApplicationName(consumerConfig.ApplicationConfig) - if err := configCenterRefreshConsumer(); err != nil { - logger.Errorf("[consumer config center refresh] %#v", err) + checkApplicationName(consumerConfig.ApplicationConfig) + if err := configCenterRefreshConsumer(); err != nil { + logger.Errorf("[consumer config center refresh] %#v", err) + } + checkRegistries(consumerConfig.Registries, consumerConfig.Registry) + for key, ref := range consumerConfig.References { + if ref.Generic { + genericService := NewGenericService(key) + SetConsumerService(genericService) } - checkRegistries(consumerConfig.Registries, consumerConfig.Registry) - for key, ref := range consumerConfig.References { - if ref.Generic { - genericService := NewGenericService(key) - SetConsumerService(genericService) - } - rpcService := GetConsumerService(key) - if rpcService == nil { - logger.Warnf("%s does not exist!", key) - continue - } - ref.id = key - ref.Refer(rpcService) - ref.Implement(rpcService) + rpcService := GetConsumerService(key) + if rpcService == nil { + logger.Warnf("%s does not exist!", key) + continue } + ref.id = key + ref.Refer(rpcService) + ref.Implement(rpcService) + } - //wait for invoker is available, if wait over default 3s, then panic - var count int - checkok := true - for { - for _, refconfig := range consumerConfig.References { - if (refconfig.Check != nil && *refconfig.Check) || - (refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) || - (refconfig.Check == nil && consumerConfig.Check == nil) { //default to true - - if refconfig.invoker != nil && - !refconfig.invoker.IsAvailable() { - checkok = false - count++ - if count > maxWait { - errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version) - logger.Error(errMsg) - panic(errMsg) - } - time.Sleep(time.Second * 1) - break - } - if refconfig.invoker == nil { - logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName) + //wait for invoker is available, if wait over default 3s, then panic + var count int + checkok := true + for { + for _, refconfig := range consumerConfig.References { + if (refconfig.Check != nil && *refconfig.Check) || + (refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) || + (refconfig.Check == nil && consumerConfig.Check == nil) { //default to true + + if refconfig.invoker != nil && + !refconfig.invoker.IsAvailable() { + checkok = false + count++ + if count > maxWait { + errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version) + logger.Error(errMsg) + panic(errMsg) } + time.Sleep(time.Second * 1) + break + } + if refconfig.invoker == nil { + logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName) } } - if checkok { - break - } - checkok = true } + if checkok { + break + } + checkok = true } +} - // service config +func loadProviderConfig() { if providerConfig == nil { logger.Warnf("providerConfig is nil!") - } else { - // init other provider config - proConfigType := providerConfig.ConfigType - for key, value := range extension.GetDefaultConfigReader() { - if proConfigType != nil { - if v, ok := proConfigType[key]; ok { - value = v - } - } - if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil { - logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value) + return + } + + // init other provider config + proConfigType := providerConfig.ConfigType + for key, value := range extension.GetDefaultConfigReader() { + if proConfigType != nil { + if v, ok := proConfigType[key]; ok { + value = v } } + if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil { + logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value) + } + } - // so, you should know that the consumer's config will be override - metricConfig = providerConfig.MetricConfig - applicationConfig = providerConfig.ApplicationConfig + // so, you should know that the consumer's config will be override + metricConfig = providerConfig.MetricConfig + applicationConfig = providerConfig.ApplicationConfig - checkApplicationName(providerConfig.ApplicationConfig) - if err := configCenterRefreshProvider(); err != nil { - logger.Errorf("[provider config center refresh] %#v", err) + extension.SetAndInitGlobalDispatcher(consumerConfig.eventDispatcherType) + + checkApplicationName(providerConfig.ApplicationConfig) + if err := configCenterRefreshProvider(); err != nil { + logger.Errorf("[provider config center refresh] %#v", err) + } + checkRegistries(providerConfig.Registries, providerConfig.Registry) + for key, svs := range providerConfig.Services { + rpcService := GetProviderService(key) + if rpcService == nil { + logger.Warnf("%s does not exist!", key) + continue } - checkRegistries(providerConfig.Registries, providerConfig.Registry) - for key, svs := range providerConfig.Services { - rpcService := GetProviderService(key) - if rpcService == nil { - logger.Warnf("%s does not exist!", key) - continue - } - svs.id = key - svs.Implement(rpcService) - svs.Protocols = providerConfig.Protocols - if err := svs.Export(); err != nil { - panic(fmt.Sprintf("service %s export failed! err: %#v", key, err)) - } + svs.id = key + svs.Implement(rpcService) + if err := svs.Export(); err != nil { + panic(fmt.Sprintf("service %s export failed! err: %#v", key, err)) } } +} + +func initRouter() { + if confRouterFile != "" { + if err := RouterInit(confRouterFile); err != nil { + log.Printf("[routerConfig init] %#v", err) + } + } +} + +// Load Dubbo Init +func Load() { + + // init router + initRouter() + + // reference config + loadConsumerConfig() + + // service config + loadProviderConfig() + // init the shutdown callback GracefulShutdownInit() } diff --git a/config/registry_config.go b/config/registry_config.go index f3d22311b8..e877a2c19d 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -36,14 +36,15 @@ import ( // RegistryConfig ... type RegistryConfig struct { Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - //I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig + // I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` - //for registry - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + // for registry + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } // UnmarshalYAML ... @@ -70,9 +71,11 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf for k, registryConf := range registries { target := false - // if user not config targetRegistries,default load all - // Notice:in func "func Split(s, sep string) []string" comment : if s does not contain sep and sep is not empty, SplitAfter returns a slice of length 1 whose only element is s. - // So we have to add the condition when targetRegistries string is not set (it will be "" when not set) + // if user not config targetRegistries, default load all + // Notice: in func "func Split(s, sep string) []string" comment: + // if s does not contain sep and sep is not empty, SplitAfter returns + // a slice of length 1 whose only element is s. So we have to add the + // condition when targetRegistries string is not set (it will be "" when not set) if len(trSlice) == 0 || (len(trSlice) == 1 && trSlice[0] == "") { target = true } else { @@ -86,29 +89,24 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf } if target { - var ( - url common.URL - err error - ) - addresses := strings.Split(registryConf.Address, ",") address := addresses[0] address = translateRegistryConf(address, registryConf) - url, err = common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address, + url, err := common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address, common.WithParams(registryConf.getUrlMap(roleType)), + common.WithParamsValue("simplified", strconv.FormatBool(registryConf.Simplified)), common.WithUsername(registryConf.Username), common.WithPassword(registryConf.Password), common.WithLocation(registryConf.Address), ) if err != nil { - logger.Errorf("The registry id:%s url is invalid , error: %#v", k, err) + logger.Errorf("The registry id: %s url is invalid, error: %#v", k, err) panic(err) } else { urls = append(urls, &url) } } - } return urls @@ -123,7 +121,6 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { for k, v := range c.Params { urlMap.Set(k, v) } - return urlMap } @@ -131,7 +128,7 @@ func translateRegistryConf(address string, registryConf *RegistryConfig) string if strings.Contains(address, "://") { translatedUrl, err := url.Parse(address) if err != nil { - logger.Errorf("The registry url is invalid , error: %#v", err) + logger.Errorf("The registry url is invalid, error: %#v", err) panic(err) } address = translatedUrl.Host diff --git a/config/service_config.go b/config/service_config.go index 9cf60c8fb3..4597c4a0a5 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -18,6 +18,7 @@ package config import ( + "container/list" "context" "fmt" "net/url" @@ -29,6 +30,7 @@ import ( import ( "github.com/creasty/defaults" + gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -111,6 +113,24 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { } } +// Get Random Port +func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { + ports := list.New() + for _, proto := range protocolConfigs { + if len(proto.Port) > 0 { + continue + } + + tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip) + if err != nil { + panic(perrors.New(fmt.Sprintf("Get tcp port error,err is {%v}", err))) + } + defer tcp.Close() + ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1]) + } + return ports +} + // InitExported will set exported as false atom bool func (c *ServiceConfig) InitExported() { c.exported = atomic.NewBool(false) @@ -143,6 +163,9 @@ func (c *ServiceConfig) Export() error { logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol) return nil } + + ports := getRandomPort(protocolConfigs) + nextPort := ports.Front() for _, proto := range protocolConfigs { // registry the service reflect methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService) @@ -151,11 +174,18 @@ func (c *ServiceConfig) Export() error { logger.Errorf(err.Error()) return err } + + port := proto.Port + + if len(proto.Port) == 0 { + port = nextPort.Value.(string) + nextPort = nextPort.Next() + } ivkURL := common.NewURLWithOptions( common.WithPath(c.id), common.WithProtocol(proto.Name), common.WithIp(proto.Ip), - common.WithPort(proto.Port), + common.WithPort(port), common.WithParams(urlMap), common.WithParamsValue(constant.BEAN_NAME_KEY, c.id), common.WithMethods(strings.Split(methods, ",")), diff --git a/config/service_config_test.go b/config/service_config_test.go index fb3b4a7dc2..efa2cafa36 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -21,6 +21,11 @@ import ( "testing" ) +import ( + gxnet "github.com/dubbogo/gost/net" + "github.com/stretchr/testify/assert" +) + import ( "go.uber.org/atomic" ) @@ -145,3 +150,35 @@ func Test_Export(t *testing.T) { } providerConfig = nil } + +func Test_getRandomPort(t *testing.T) { + protocolConfigs := make([]*ProtocolConfig, 0, 3) + + ip, err := gxnet.GetLocalIP() + protocolConfigs = append(protocolConfigs, &ProtocolConfig{ + Ip: ip, + }) + protocolConfigs = append(protocolConfigs, &ProtocolConfig{ + Ip: ip, + }) + protocolConfigs = append(protocolConfigs, &ProtocolConfig{ + Ip: ip, + }) + assert.NoError(t, err) + ports := getRandomPort(protocolConfigs) + + assert.Equal(t, ports.Len(), len(protocolConfigs)) + + front := ports.Front() + for { + if front == nil { + break + } + t.Logf("port:%v", front.Value) + front = front.Next() + } + + protocolConfigs = make([]*ProtocolConfig, 0, 3) + ports = getRandomPort(protocolConfigs) + assert.Equal(t, ports.Len(), len(protocolConfigs)) +} diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 007b8be142..0b8aceb23c 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -36,7 +36,10 @@ import ( "github.com/apache/dubbo-go/config_center/parser" ) -const nacosClientName = "nacos config_center" +const ( + nacosClientName = "nacos config_center" + maxKeysNum = 9999 +) type nacosDynamicConfiguration struct { url *common.URL @@ -108,9 +111,23 @@ func (n *nacosDynamicConfiguration) PublishConfig(key string, group string, valu // GetConfigKeysByGroup will return all keys with the group func (n *nacosDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { - // TODO (the golang client of nacos does not support batch API) - // we should build a issue and then think about how to resolve this problem - return nil, perrors.New("unsupport operation, wait for implement") + group = n.resolvedGroup(group) + page, err := (*n.client.Client()).SearchConfig(vo.SearchConfigParm{ + Search: "accurate", + Group: group, + PageNo: 1, + // actually it's impossible for user to create 9999 application under one group + PageSize: maxKeysNum, + }) + + result := gxset.NewSet() + if err != nil { + return result, perrors.WithMessage(err, "can not find the client config") + } + for _, itm := range page.PageItems { + result.Add(itm.Content) + } + return result, nil } // GetRule Get router rule diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index 4032c91cda..b74c155d92 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -88,6 +88,34 @@ func Test_GetConfig(t *testing.T) { assert.NoError(t, err) } +func TestNacosDynamicConfiguration_GetConfigKeysByGroup(t *testing.T) { + data := ` +{ + "PageItems": [ + { + "Content": "application" + } + ] +} +` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(data)) + })) + + nacosURL := strings.ReplaceAll(ts.URL, "http", "registry") + regurl, _ := common.NewURL(nacosURL) + nacosConfiguration, err := newNacosDynamicConfiguration(®url) + assert.NoError(t, err) + + nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + + configs, err := nacosConfiguration.GetConfigKeysByGroup("dubbo") + assert.Nil(t, err) + assert.Equal(t, 1, configs.Size()) + assert.True(t, configs.Contains("application")) + +} + func TestNacosDynamicConfiguration_PublishConfig(t *testing.T) { nacos, err := initNacosData(t) assert.Nil(t, err) @@ -109,7 +137,7 @@ func Test_AddListener(t *testing.T) { } func Test_RemoveListener(t *testing.T) { - //TODO not supported in current go_nacos_sdk version + // TODO not supported in current go_nacos_sdk version } type mockDataListener struct { diff --git a/filter/filter_impl/access_log_filter.go b/filter/filter_impl/access_log_filter.go index fbfe756517..49cdc2287c 100644 --- a/filter/filter_impl/access_log_filter.go +++ b/filter/filter_impl/access_log_filter.go @@ -71,14 +71,18 @@ func init() { * * the value of "accesslog" can be "true" or "default" too. * If the value is one of them, the access log will be record in log file which defined in log.yml + * AccessLogFilter is designed to be singleton */ type AccessLogFilter struct { logChan chan AccessLogData } -// Invoke ... +// Invoke will check whether user wants to use this filter. +// If we find the value of key constant.ACCESS_LOG_KEY, we will log the invocation info func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { accessLog := invoker.GetUrl().GetParam(constant.ACCESS_LOG_KEY, "") + + // the user do not if len(accessLog) > 0 { accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog} ef.logIntoChannel(accessLogData) @@ -86,7 +90,7 @@ func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, return invoker.Invoke(ctx, invocation) } -// it won't block the invocation +// logIntoChannel won't block the invocation func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) { select { case ef.logChan <- accessLogData: @@ -97,6 +101,7 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) { } } +// buildAccessLogData builds the access log data func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string { dataMap := make(map[string]string, 16) attachments := invocation.Attachments() @@ -130,11 +135,12 @@ func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation pro return dataMap } -// OnResponse ... +// OnResponse do nothing func (ef *AccessLogFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result { return result } +// writeLogToFile actually write the logs into file func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) { accessLog := data.accessLog if isDefault(accessLog) { @@ -156,6 +162,12 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) { } } +// openLogFile will open the log file with append mode. +// If the file is not found, it will create the file. +// Actually, the accessLog is the filename +// You may find out that, once we want to write access log into log file, +// we open the file again and again. +// It needs to be optimized. func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) { logFile, err := os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode) if err != nil { @@ -169,6 +181,12 @@ func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) { return nil, err } last := fileInfo.ModTime().Format(FileDateFormat) + + // this is confused. + // for example, if the last = '2020-03-04' + // and today is '2020-03-05' + // we will create one new file to log access data + // By this way, we can split the access log based on days. if now != last { err = os.Rename(fileInfo.Name(), fileInfo.Name()+"."+now) if err != nil { @@ -180,11 +198,12 @@ func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) { return logFile, err } +// isDefault check whether accessLog == true or accessLog == default func isDefault(accessLog string) bool { return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog) } -// GetAccessLogFilter ... +// GetAccessLogFilter return the instance of AccessLogFilter func GetAccessLogFilter() filter.Filter { accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)} go func() { @@ -195,12 +214,13 @@ func GetAccessLogFilter() filter.Filter { return accessLogFilter } -// AccessLogData ... +// AccessLogData defines the data that will be log into file type AccessLogData struct { accessLog string data map[string]string } +// toLogMessage convert the AccessLogData to String func (ef *AccessLogData) toLogMessage() string { builder := strings.Builder{} builder.WriteString("[") diff --git a/filter/filter_impl/tps/tps_limiter_method_service.go b/filter/filter_impl/tps/tps_limiter_method_service.go index 7fe8de9237..2d44c688eb 100644 --- a/filter/filter_impl/tps/tps_limiter_method_service.go +++ b/filter/filter_impl/tps/tps_limiter_method_service.go @@ -115,7 +115,12 @@ type MethodServiceTpsLimiterImpl struct { tpsState *concurrent.Map } -// IsAllowable ... +// IsAllowable based on method-level and service-level. +// The method-level has high priority which means that if there is any rate limit configuration for the method, +// the service-level rate limit strategy will be ignored. +// The key point is how to keep thread-safe +// This implementation use concurrent map + loadOrStore to make implementation thread-safe +// You can image that even multiple threads create limiter, but only one could store the limiter into tpsState func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool { methodConfigPrefix := "methods." + invocation.MethodName() + "." @@ -123,23 +128,30 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "") methodIntervalConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "") + // service-level tps limit limitTarget := url.ServiceKey() // method-level tps limit if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) > 0 { + // it means that if the method-level rate limit exist, we will use method-level rate limit strategy limitTarget = limitTarget + "#" + invocation.MethodName() } + // looking up the limiter from 'cache' limitState, found := limiter.tpsState.Load(limitTarget) if found { + // the limiter has been cached, we return its result return limitState.(filter.TpsLimitStrategy).IsAllowable() } + // we could not find the limiter, and try to create one. + limitRate := getLimitConfig(methodLimitRateConfig, url, invocation, constant.TPS_LIMIT_RATE_KEY, constant.DEFAULT_TPS_LIMIT_RATE) if limitRate < 0 { + // the limitTarget is not necessary to be limited. return true } @@ -150,13 +162,20 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String())) } + // find the strategy config and then create one limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY)) limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig) + + // we using loadOrStore to ensure thread-safe limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval))) + return limitState.(filter.TpsLimitStrategy).IsAllowable() } +// getLimitConfig will try to fetch the configuration from url. +// If we can convert the methodLevelConfig to int64, return; +// Or, we will try to look up server-level configuration and then convert it to int64 func getLimitConfig(methodLevelConfig string, url common.URL, invocation protocol.Invocation, @@ -172,6 +191,8 @@ func getLimitConfig(methodLevelConfig string, return result } + // actually there is no method-level configuration, so we use the service-level configuration + result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0) if err != nil { @@ -183,7 +204,7 @@ func getLimitConfig(methodLevelConfig string, var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl var methodServiceTpsLimiterOnce sync.Once -// GetMethodServiceTpsLimiter ... +// GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiterImpl instance. func GetMethodServiceTpsLimiter() filter.TpsLimiter { methodServiceTpsLimiterOnce.Do(func() { methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{ diff --git a/filter/handler/rejected_execution_handler_only_log.go b/filter/handler/rejected_execution_handler_only_log.go index 0f9003c7df..fe9cf4869f 100644 --- a/filter/handler/rejected_execution_handler_only_log.go +++ b/filter/handler/rejected_execution_handler_only_log.go @@ -36,6 +36,7 @@ const ( ) func init() { + // this implementation is the the default implementation of RejectedExecutionHandler extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler) extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler) } @@ -56,11 +57,12 @@ var onlyLogHandlerOnce sync.Once * tps.limit.rejected.handler: "default" or "log" * methods: * - name: "GetUser" + * OnlyLogRejectedExecutionHandler is designed to be singleton */ type OnlyLogRejectedExecutionHandler struct { } -// RejectedExecution ... +// RejectedExecution will do nothing, it only log the invocation. func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, _ protocol.Invocation) protocol.Result { @@ -68,7 +70,7 @@ func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL return &protocol.RPCResult{} } -// GetOnlyLogRejectedExecutionHandler ... +// GetOnlyLogRejectedExecutionHandler will return the instance of OnlyLogRejectedExecutionHandler func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler { onlyLogHandlerOnce.Do(func() { onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{} diff --git a/filter/rejected_execution_handler.go b/filter/rejected_execution_handler.go index caeea1db66..d02481b98d 100644 --- a/filter/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -31,5 +31,7 @@ import ( * In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler. */ type RejectedExecutionHandler interface { + + // RejectedExecution will be called if the invocation was rejected by some component. RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result } diff --git a/filter/tps_limit_strategy.go b/filter/tps_limit_strategy.go index 5edf32ce19..e194f1da06 100644 --- a/filter/tps_limit_strategy.go +++ b/filter/tps_limit_strategy.go @@ -33,10 +33,16 @@ package filter * tps.limit.strategy: "name of implementation" # method-level */ type TpsLimitStrategy interface { + // IsAllowable will return true if this invocation is not over limitation IsAllowable() bool } -// TpsLimitStrategyCreator ... +// TpsLimitStrategyCreator, the creator abstraction for TpsLimitStrategy type TpsLimitStrategyCreator interface { - Create(rate int, interval int) TpsLimitStrategy + // Create will create an instance of TpsLimitStrategy + // It will be a little hard to understand this method. + // The unit of interval is ms + // for example, if the limit = 100, interval = 1000 + // which means that the tps limitation is 100 times per 1000ms (100/1000ms) + Create(limit int, interval int) TpsLimitStrategy } diff --git a/filter/tps_limiter.go b/filter/tps_limiter.go index dbc9f76838..531eb09823 100644 --- a/filter/tps_limiter.go +++ b/filter/tps_limiter.go @@ -34,5 +34,6 @@ import ( * tps.limiter: "the name of limiter", */ type TpsLimiter interface { + // IsAllowable will check whether this invocation should be enabled for further process IsAllowable(common.URL, protocol.Invocation) bool } diff --git a/go.mod b/go.mod index 8a599aa55f..490844c929 100644 --- a/go.mod +++ b/go.mod @@ -1,61 +1,52 @@ module github.com/apache/dubbo-go require ( - github.com/Workiva/go-datastructures v1.0.52 + github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 - github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect - github.com/apache/dubbo-go-hessian2 v1.4.0 - github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect + github.com/apache/dubbo-go-hessian2 v1.5.0 github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/creasty/defaults v1.3.0 - github.com/dubbogo/getty v1.3.3 + github.com/dubbogo/getty v1.3.5 github.com/dubbogo/go-zookeeper v1.0.0 - github.com/dubbogo/gost v1.8.0 + github.com/dubbogo/gost v1.9.0 github.com/emicklei/go-restful/v3 v3.0.0 - github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect - github.com/go-errors/errors v1.0.1 // indirect github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect - github.com/gotestyourself/gotestyourself v2.2.0+incompatible github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul v1.5.3 github.com/hashicorp/consul/api v1.1.0 - github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8 - github.com/jonboulle/clockwork v0.1.0 // indirect - github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect - github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect - github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect + github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect + github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b // indirect github.com/magiconair/properties v1.8.1 github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c + github.com/nacos-group/nacos-sdk-go v0.3.1 github.com/opentracing/opentracing-go v1.1.0 - github.com/pkg/errors v0.8.1 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.1.0 github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect github.com/soheilhy/cmux v0.1.4 // indirect github.com/stretchr/testify v1.5.1 - github.com/tebeka/strftime v0.1.3 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect - github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8 go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect google.golang.org/grpc v1.22.1 gopkg.in/yaml.v2 v2.2.2 k8s.io/api v0.0.0-20190325185214-7544f9db76f6 diff --git a/go.sum b/go.sum index 73c3da87a0..326b4e6897 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,8 @@ github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc h1:LkkwnbY+S8Wmw github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc/go.mod h1:ARgCUhI1MHQH+ONky/PAtmVHQrP5JlGY0F3poXOp/fA= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI= -github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af h1:DBNMBMuMiWYu0b+8KMJuWmfCkcxl09JwdlqwDZZ6U14= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= @@ -37,9 +37,11 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -github.com/apache/dubbo-go-hessian2 v1.4.0 h1:Cb9FQVTy3G93dnDr7P93U8DeKFYpDTJjQp44JG5TafA= -github.com/apache/dubbo-go-hessian2 v1.4.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= +github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s= +github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -106,14 +108,13 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/dubbogo/getty v1.3.3 h1:8m4zZBqFHO+NmhH7rMPlFuuYRVjcPD7cUhumevqMZZs= -github.com/dubbogo/getty v1.3.3/go.mod h1:U92BDyJ6sW9Jpohr2Vlz8w2uUbIbNZ3d+6rJvFTSPp0= +github.com/dubbogo/getty v1.3.5 h1:xJxdDj9jm7wlrRSsVZSk2TDNxJbbac5GpxV0QpjO+Tw= +github.com/dubbogo/getty v1.3.5/go.mod h1:T55vN8Q6tZjf2AQZiGmkujneD3LfqYbv2b3QjacwYOY= github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM= github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= -github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= -github.com/dubbogo/gost v1.8.0 h1:9ACbQe5OwMjqtinQcNJC5xp16kky27OsfSGw5L9A6vw= -github.com/dubbogo/gost v1.8.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= +github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk= +github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= @@ -323,6 +324,12 @@ github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9 h1:hJix6idebFclqlfZCHE7EUX7uqLCyb70nHNHH1XKGBg= +github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= +github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifGWef4WUqvXk0iRqdhdy/2uzI= +github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= +github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b h1:Rrp0ByJXEjhREMPGTt3aWYjoIsUGCbt21ekbeJcTWv0= +github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/keybase/go-crypto v0.0.0-20180614160407-5114a9a81e1b h1:VE6r2OwP5gj+Z9aCkSKl3MlmnZbfMAjhvR5T7abKHEo= github.com/keybase/go-crypto v0.0.0-20180614160407-5114a9a81e1b/go.mod h1:ghbZscTyKdM07+Fw3KSi0hcJm+AlEUWj8QLlPtijN/M= @@ -384,6 +391,8 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo= github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= +github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg= +github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0= @@ -417,6 +426,8 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 4cfac8f828..5e40692905 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -19,6 +19,7 @@ package dynamic import ( "strconv" + "sync" "time" ) @@ -28,7 +29,9 @@ import ( ) import ( + env "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/metadata/mapping" @@ -37,9 +40,16 @@ import ( const ( defaultGroup = config_center.DEFAULT_GROUP slash = "/" + name = "dynamic" ) +func init() { + extension.SetServiceNameMapping(name, GetServiceNameMappingInstance) + extension.SetServiceNameMapping(constant.DEFAULT_KEY, GetServiceNameMappingInstance) +} + // DynamicConfigurationServiceNameMapping is the implementation based on config center +// It could be thought as singleton pattern. type DynamicConfigurationServiceNameMapping struct { dc config_center.DynamicConfiguration } @@ -76,7 +86,22 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str return defaultGroup + slash + serviceInterface } -// NewServiceNameMapping will create an instance of DynamicConfigurationServiceNameMapping -func NewServiceNameMapping(dc config_center.DynamicConfiguration) mapping.ServiceNameMapping { +var ( + instance *DynamicConfigurationServiceNameMapping + initOnce sync.Once +) + +// newServiceNameMapping will create an instance of DynamicConfigurationServiceNameMapping +func newServiceNameMapping(dc config_center.DynamicConfiguration) *DynamicConfigurationServiceNameMapping { return &DynamicConfigurationServiceNameMapping{dc: dc} } + +// GetServiceNameMappingInstance will return the instance. +// If the instance is not initiated, it will create one +func GetServiceNameMappingInstance() mapping.ServiceNameMapping { + initOnce.Do(func() { + dc := env.GetEnvInstance().GetDynamicConfiguration() + instance = newServiceNameMapping(dc) + }) + return instance +} diff --git a/metadata/mapping/dynamic/service_name_mapping_test.go b/metadata/mapping/dynamic/service_name_mapping_test.go index e3d620cd73..31aaf3f11c 100644 --- a/metadata/mapping/dynamic/service_name_mapping_test.go +++ b/metadata/mapping/dynamic/service_name_mapping_test.go @@ -41,7 +41,7 @@ func TestDynamicConfigurationServiceNameMapping(t *testing.T) { }).GetDynamicConfiguration(nil) config.GetApplicationConfig().Name = appName - mapping := NewServiceNameMapping(dc) + mapping := newServiceNameMapping(dc) intf := constant.METADATA_SERVICE_NAME group := "myGroup" version := "myVersion" diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 5ec7db51af..e6ffa64d80 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -88,7 +88,7 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// SetClientConf ... +// SetClientConf set dubbo client config. func SetClientConf(c ClientConfig) { clientConf = &c err := clientConf.CheckValidity() @@ -99,7 +99,7 @@ func SetClientConf(c ClientConfig) { setClientGrpool() } -// GetClientConf ... +// GetClientConf get dubbo client config. func GetClientConf() ClientConfig { return *clientConf } @@ -129,7 +129,7 @@ type AsyncCallbackResponse struct { Reply interface{} } -// Client ... +// Client is dubbo protocol client. type Client struct { opts Options conf ClientConfig @@ -139,7 +139,7 @@ type Client struct { pendingResponses *sync.Map } -// NewClient ... +// NewClient create a new Client. func NewClient(opt Options) *Client { switch { @@ -167,7 +167,7 @@ func NewClient(opt Options) *Client { return c } -// Request ... +// Request is dubbo protocol request. type Request struct { addr string svcUrl common.URL @@ -176,7 +176,7 @@ type Request struct { atta map[string]string } -// NewRequest ... +// NewRequest create a new Request. func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request { return &Request{ addr: addr, @@ -187,13 +187,13 @@ func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, } } -// Response ... +// Response is dubbo protocol response. type Response struct { reply interface{} atta map[string]string } -// NewResponse ... +// NewResponse create a new Response. func NewResponse(reply interface{}, atta map[string]string) *Response { return &Response{ reply: reply, @@ -201,15 +201,14 @@ func NewResponse(reply interface{}, atta map[string]string) *Response { } } -// CallOneway call one way +// CallOneway call by one way func (c *Client) CallOneway(request *Request) error { return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil)) } -// Call if @response is nil, the transport layer will get the response without notify the invoker. +// Call call remoting by two way or one way, if @response.reply is nil, the way of call is one way. func (c *Client) Call(request *Request, response *Response) error { - ct := CT_TwoWay if response.reply == nil { ct = CT_OneWay @@ -218,14 +217,12 @@ func (c *Client) Call(request *Request, response *Response) error { return perrors.WithStack(c.call(ct, request, response, nil)) } -// AsyncCall ... +// AsyncCall call remoting by async with callback. func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error { - return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) } func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error { - p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "") @@ -293,7 +290,7 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac return perrors.WithStack(err) } -// Close ... +// Close close the client pool. func (c *Client) Close() { if c.pool != nil { c.pool.close() diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 76416b2baf..620a57d4a7 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -69,7 +69,7 @@ func (p DubboPackage) String() string { return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) } -// Marshal ... +// Marshal encode hessian package. func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { codec := hessian.NewHessianCodec(nil) @@ -81,7 +81,7 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { return bytes.NewBuffer(pkg), nil } -// Unmarshal ... +// Unmarshal dncode hessian package. func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { // fix issue https://github.com/apache/dubbo-go/issues/380 bufLen := buf.Len() @@ -125,7 +125,7 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { // PendingResponse //////////////////////////////////////////// -// PendingResponse ... +// PendingResponse is a pending response. type PendingResponse struct { seq uint64 err error @@ -136,7 +136,7 @@ type PendingResponse struct { done chan struct{} } -// NewPendingResponse ... +// NewPendingResponse create a PendingResponses. func NewPendingResponse() *PendingResponse { return &PendingResponse{ start: time.Now(), @@ -145,7 +145,7 @@ func NewPendingResponse() *PendingResponse { } } -// GetCallResponse ... +// GetCallResponse get AsyncCallbackResponse. func (r PendingResponse) GetCallResponse() common.CallbackResponse { return AsyncCallbackResponse{ Cause: r.err, diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index dbc6989c54..6a1daf857a 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -147,7 +147,7 @@ func GetDefaultServerConfig() ServerConfig { } } -// CheckValidity ... +// CheckValidity confirm getty sessian params. func (c *GettySessionParam) CheckValidity() error { var err error @@ -170,7 +170,7 @@ func (c *GettySessionParam) CheckValidity() error { return nil } -// CheckValidity ... +// CheckValidity confirm client params. func (c *ClientConfig) CheckValidity() error { var err error @@ -192,7 +192,7 @@ func (c *ClientConfig) CheckValidity() error { return perrors.WithStack(c.GettySessionParam.CheckValidity()) } -// CheckValidity ... +// CheckValidity confirm server params. func (c *ServerConfig) CheckValidity() error { var err error diff --git a/protocol/dubbo/dubbo_exporter.go b/protocol/dubbo/dubbo_exporter.go index 1c45c40056..dd80937c5b 100644 --- a/protocol/dubbo/dubbo_exporter.go +++ b/protocol/dubbo/dubbo_exporter.go @@ -28,19 +28,19 @@ import ( "github.com/apache/dubbo-go/protocol" ) -// DubboExporter ... +// DubboExporter is dubbo service exporter. type DubboExporter struct { protocol.BaseExporter } -// NewDubboExporter ... +// NewDubboExporter get a DubboExporter. func NewDubboExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *DubboExporter { return &DubboExporter{ BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), } } -// Unexport ... +// Unexport unexport dubbo service exporter. func (de *DubboExporter) Unexport() { serviceId := de.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") interfaceName := de.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "") diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 09c3725710..59202d5f49 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -39,8 +39,9 @@ import ( ) var ( - // ErrNoReply ... - ErrNoReply = perrors.New("request need @response") + // ErrNoReply + ErrNoReply = perrors.New("request need @response") + // ErrDestroyedInvoker ErrDestroyedInvoker = perrors.New("request Destroyed invoker") ) @@ -48,7 +49,7 @@ var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} ) -// DubboInvoker ... +// DubboInvoker is dubbo client invoker. type DubboInvoker struct { protocol.BaseInvoker client *Client @@ -57,7 +58,7 @@ type DubboInvoker struct { reqNum int64 } -// NewDubboInvoker ... +// NewDubboInvoker create dubbo client invoker. func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), @@ -66,7 +67,7 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { } } -// Invoke ... +// Invoke call remoting. func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( err error @@ -122,7 +123,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati return &result } -// Destroy ... +// Destroy destroy dubbo client invoker. func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { for { diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 355dbc8024..b7d0a8a268 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -45,14 +45,14 @@ var ( dubboProtocol *DubboProtocol ) -// DubboProtocol ... +// DubboProtocol is a dubbo protocol implement. type DubboProtocol struct { protocol.BaseProtocol serverMap map[string]*Server serverLock sync.Mutex } -// NewDubboProtocol ... +// NewDubboProtocol create a dubbo protocol. func NewDubboProtocol() *DubboProtocol { return &DubboProtocol{ BaseProtocol: protocol.NewBaseProtocol(), @@ -60,7 +60,7 @@ func NewDubboProtocol() *DubboProtocol { } } -// Export ... +// Export export dubbo service. func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetUrl() serviceKey := url.ServiceKey() @@ -73,7 +73,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -// Refer ... +// Refer create dubbo service reference. func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { //default requestTimeout var requestTimeout = config.GetConsumerConfig().RequestTimeout @@ -92,7 +92,7 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { return invoker } -// Destroy ... +// Destroy destroy dubbo service. func (dp *DubboProtocol) Destroy() { logger.Infof("DubboProtocol destroy.") @@ -124,7 +124,7 @@ func (dp *DubboProtocol) openServer(url common.URL) { } } -// GetProtocol ... +// GetProtocol get a single dubbo protocol. func GetProtocol() protocol.Protocol { if dubboProtocol == nil { dubboProtocol = NewDubboProtocol() diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 0251b78a2b..1f4cc0068e 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -86,7 +86,7 @@ func (h *RpcClientHandler) OnOpen(session getty.Session) error { // OnError ... func (h *RpcClientHandler) OnError(session getty.Session, err error) { - logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) + logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err) h.conn.removeSession(session) } @@ -201,7 +201,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error { // OnError ... func (h *RpcServerHandler) OnError(session getty.Session, err error) { - logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) + logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err) h.rwlock.Lock() delete(h.sessionMap, session) h.rwlock.Unlock() diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index b5c4f50919..9cc7ea25cd 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -38,16 +38,17 @@ import ( // RpcClientPackageHandler //////////////////////////////////////////// -// RpcClientPackageHandler ... +// RpcClientPackageHandler handle package for client in getty. type RpcClientPackageHandler struct { client *Client } -// NewRpcClientPackageHandler ... +// NewRpcClientPackageHandler create a RpcClientPackageHandler. func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { return &RpcClientPackageHandler{client: client} } +// Read decode @data to DubboPackage. func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { pkg := &DubboPackage{} @@ -72,6 +73,7 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } +// Write encode @pkg. func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { req, ok := pkg.(*DubboPackage) if !ok { @@ -96,9 +98,10 @@ var ( rpcServerPkgHandler = &RpcServerPackageHandler{} ) -// RpcServerPackageHandler ... +// RpcServerPackageHandler handle package for server in getty. type RpcServerPackageHandler struct{} +// Read decode @data to DubboPackage. func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { pkg := &DubboPackage{ Body: make([]interface{}, 7), @@ -169,6 +172,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } +// Write encode @pkg. func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { res, ok := pkg.(*DubboPackage) if !ok { diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index bd2b37b7a9..8de353a0b3 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -71,10 +71,10 @@ func init() { if err := srvConf.CheckValidity(); err != nil { panic(err) } - SetServerGrpool() + setServerGrpool() } -// SetServerConfig ... +// SetServerConfig set dubbo server config. func SetServerConfig(s ServerConfig) { srvConf = &s err := srvConf.CheckValidity() @@ -82,30 +82,29 @@ func SetServerConfig(s ServerConfig) { logger.Warnf("[ServerConfig CheckValidity] error: %v", err) return } - SetServerGrpool() + setServerGrpool() } -// GetServerConfig ... +// GetServerConfig get dubbo server config. func GetServerConfig() ServerConfig { return *srvConf } -// SetServerGrpool ... -func SetServerGrpool() { +func setServerGrpool() { if srvConf.GrPoolSize > 1 { srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen), gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber)) } } -// Server ... +// Server is dubbo protocol server. type Server struct { conf ServerConfig tcpServer getty.Server rpcHandler *RpcServerHandler } -// NewServer ... +// NewServer create a new Server. func NewServer() *Server { s := &Server{ @@ -156,7 +155,7 @@ func (s *Server) newSession(session getty.Session) error { return nil } -// Start ... +// Start start dubbo server. func (s *Server) Start(url common.URL) { var ( addr string @@ -173,7 +172,7 @@ func (s *Server) Start(url common.URL) { } -// Stop ... +// Stop stop dubbo server. func (s *Server) Stop() { s.tcpServer.Close() } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 20be268d74..552aa57061 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -19,7 +19,6 @@ package directory import ( "sync" - "time" ) import ( @@ -28,6 +27,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -42,15 +42,11 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// Options ... -type Options struct { - serviceTTL time.Duration +func init() { + extension.SetDefaultRegistryDirectory(NewRegistryDirectory) } -// Option ... -type Option func(*Options) - -type registryDirectory struct { +type RegistryDirectory struct { directory.BaseDirectory cacheInvokers []protocol.Invoker listenerLock sync.Mutex @@ -61,48 +57,41 @@ type registryDirectory struct { configurators []config_center.Configurator consumerConfigurationListener *consumerConfigurationListener referenceConfigurationListener *referenceConfigurationListener - Options - serviceKey string - forbidden atomic.Bool + serviceKey string + forbidden atomic.Bool } // NewRegistryDirectory ... -func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) { - options := Options{ - //default 300s - serviceTTL: time.Duration(300e9), - } - for _, opt := range opts { - opt(&options) - } +func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } - dir := ®istryDirectory{ + dir := &RegistryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, - Options: options, } dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) + + go dir.subscribe(url.SubURL) return dir, nil } //subscribe from registry -func (dir *registryDirectory) Subscribe(url *common.URL) { +func (dir *RegistryDirectory) subscribe(url *common.URL) { dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) dir.registry.Subscribe(url, dir) } -func (dir *registryDirectory) Notify(event *registry.ServiceEvent) { +func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) { go dir.update(event) } -//subscribe service from registry, and update the cacheServices -func (dir *registryDirectory) update(res *registry.ServiceEvent) { +// update the cacheServices and subscribe service from registry +func (dir *RegistryDirectory) update(res *registry.ServiceEvent) { if res == nil { return } @@ -111,7 +100,7 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { dir.refreshInvokers(res) } -func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { +func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { var ( url *common.URL oldInvoker protocol.Invoker = nil @@ -162,7 +151,7 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { } -func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { +func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { newInvokersList := []protocol.Invoker{} groupInvokersMap := make(map[string][]protocol.Invoker) groupInvokersList := []protocol.Invoker{} @@ -198,8 +187,8 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil -func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { +// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { dir.cacheInvokersMap.Delete(url.Key()) @@ -208,8 +197,8 @@ func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { return nil } -// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil -func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { +// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -244,8 +233,8 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { return nil } -//select the protocol invokers from the directory -func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { +// List selected protocol invokers from the directory +func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { invokers := dir.cacheInvokers routerChain := dir.RouterChain() @@ -255,7 +244,7 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.In return routerChain.Route(invokers, dir.cacheOriginUrl, invocation) } -func (dir *registryDirectory) IsAvailable() bool { +func (dir *RegistryDirectory) IsAvailable() bool { if !dir.BaseDirectory.IsAvailable() { return dir.BaseDirectory.IsAvailable() } @@ -269,7 +258,7 @@ func (dir *registryDirectory) IsAvailable() bool { return false } -func (dir *registryDirectory) Destroy() { +func (dir *RegistryDirectory) Destroy() { //TODO:unregister & unsubscribe dir.BaseDirectory.Destroy(func() { invokers := dir.cacheInvokers @@ -280,7 +269,7 @@ func (dir *registryDirectory) Destroy() { }) } -func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) { +func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) { doOverrideUrl(dir.configurators, targetUrl) doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl) doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl) @@ -294,11 +283,11 @@ func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common type referenceConfigurationListener struct { registry.BaseConfigurationListener - directory *registryDirectory + directory *RegistryDirectory url *common.URL } -func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener { +func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener { listener := &referenceConfigurationListener{directory: dir, url: url} listener.InitWith( url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, @@ -316,10 +305,10 @@ func (l *referenceConfigurationListener) Process(event *config_center.ConfigChan type consumerConfigurationListener struct { registry.BaseConfigurationListener listeners []registry.NotifyListener - directory *registryDirectory + directory *RegistryDirectory } -func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener { +func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener { listener := &consumerConfigurationListener{directory: dir} listener.InitWith( config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 0dde44e73c..f1d5ce434a 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -79,10 +79,9 @@ func TestSubscribe_Group(t *testing.T) { suburl.SetParam(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) - - go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) + dir, _ := NewRegistryDirectory(®url, mockRegistry) + go dir.(*RegistryDirectory).subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) //for group1 urlmap := url.Values{} urlmap.Set(constant.GROUP_KEY, "group1") @@ -101,7 +100,7 @@ func TestSubscribe_Group(t *testing.T) { } time.Sleep(1e9) - assert.Len(t, registryDirectory.cacheInvokers, 2) + assert.Len(t, dir.(*RegistryDirectory).cacheInvokers, 2) } func Test_Destroy(t *testing.T) { @@ -173,7 +172,7 @@ Loop1: } -func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockRegistry) { +func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) url, _ := common.NewURL("mock://127.0.0.1:1111") @@ -185,9 +184,9 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) url.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + dir, _ := NewRegistryDirectory(&url, mockRegistry) - go registryDirectory.Subscribe(&suburl) + go dir.(*RegistryDirectory).subscribe(&suburl) if len(noMockEvent) == 0 { for i := 0; i < 3; i++ { mockRegistry.(*registry.MockRegistry).MockEvent( @@ -201,5 +200,5 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) } } - return registryDirectory, mockRegistry.(*registry.MockRegistry) + return dir.(*RegistryDirectory), mockRegistry.(*registry.MockRegistry) } diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 7d3406cac2..fbd84ac44d 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -237,7 +237,7 @@ func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, in // DispatchEvent will dispatch the event func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { - // TODO(waiting for event dispatcher, another task) + extension.GetGlobalDispatcher().Dispatch(event) return nil } diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index a756e86693..0ac46cb9a2 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -30,19 +30,28 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/common/observer/dispatcher" "github.com/apache/dubbo-go/registry" ) func TestNacosServiceDiscovery_Destroy(t *testing.T) { - serviceDiscovry, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) assert.Nil(t, err) - assert.NotNil(t, serviceDiscovry) - err = serviceDiscovry.Destroy() + assert.NotNil(t, serviceDiscovery) + err = serviceDiscovery.Destroy() assert.Nil(t, err) - assert.Nil(t, serviceDiscovry.(*nacosServiceDiscovery).namingClient) + assert.Nil(t, serviceDiscovery.(*nacosServiceDiscovery).namingClient) } func TestNacosServiceDiscovery_CRUD(t *testing.T) { + + extension.SetEventDispatcher("mock", func() observer.EventDispatcher { + return &dispatcher.MockEventDispatcher{} + }) + + extension.SetAndInitGlobalDispatcher("mock") + serviceName := "service-name" id := "id" host := "host" diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index b47b9f372d..52a7dcbfc7 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -24,7 +24,7 @@ import ( ) import ( - gxset "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/container/set" ) import ( @@ -39,13 +39,18 @@ import ( "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" - directory2 "github.com/apache/dubbo-go/registry/directory" + _ "github.com/apache/dubbo-go/registry/directory" "github.com/apache/dubbo-go/remoting" ) var ( - regProtocol *registryProtocol - once sync.Once + regProtocol *registryProtocol + once sync.Once + reserveParams = []string{ + "application", "codec", "exchanger", "serialization", "cluster", "connections", "deprecated", "group", + "loadbalance", "mock", "path", "timeout", "token", "version", "warmup", "weight", "timestamp", "dubbo", + "release", "interface", + } ) type registryProtocol struct { @@ -88,6 +93,13 @@ func getRegistry(regUrl *common.URL) registry.Registry { return reg } +func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.URL { + if registryUrl.GetParamBool("simplified", false) { + return providerUrl.CloneWithParams(reserveParams) + } + return providerUrl +} + func (proto *registryProtocol) initConfigurationListeners() { proto.overrideListeners = &sync.Map{} proto.serviceConfigurationListeners = &sync.Map{} @@ -112,7 +124,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } //new registry directory for store service url from registry - directory, err := directory2.NewRegistryDirectory(®istryUrl, reg) + directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) @@ -124,7 +136,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } - go directory.Subscribe(serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) @@ -151,7 +162,6 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte serviceConfigurationListener.OverrideUrl(providerUrl) var reg registry.Registry - if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { reg = getRegistry(registryUrl) proto.registries.Store(registryUrl.Key(), reg) @@ -159,7 +169,8 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte reg = regI.(registry.Registry) } - err := reg.Register(*providerUrl) + registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl) + err := reg.Register(*registeredProviderUrl) if err != nil { logger.Errorf("provider service %v register registry %v error, error message is %s", providerUrl.Key(), registryUrl.Key(), err.Error()) diff --git a/registry/service/instance/random/random_service_instance_selector.go b/registry/servicediscovery/instance/random/random_service_instance_selector.go similarity index 96% rename from registry/service/instance/random/random_service_instance_selector.go rename to registry/servicediscovery/instance/random/random_service_instance_selector.go index 47f84c76c8..1efbbc3fcb 100644 --- a/registry/service/instance/random/random_service_instance_selector.go +++ b/registry/servicediscovery/instance/random/random_service_instance_selector.go @@ -21,7 +21,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/registry" - "github.com/apache/dubbo-go/registry/service/instance" + "github.com/apache/dubbo-go/registry/servicediscovery/instance" "math/rand" "time" ) diff --git a/registry/service/instance/random/random_service_instance_selector_test.go b/registry/servicediscovery/instance/random/random_service_instance_selector_test.go similarity index 100% rename from registry/service/instance/random/random_service_instance_selector_test.go rename to registry/servicediscovery/instance/random/random_service_instance_selector_test.go diff --git a/registry/service/instance/service_instance_selector.go b/registry/servicediscovery/instance/service_instance_selector.go similarity index 100% rename from registry/service/instance/service_instance_selector.go rename to registry/servicediscovery/instance/service_instance_selector.go diff --git a/registry/service/proxy/metadata_service_proxy_factory.go b/registry/servicediscovery/proxy/metadata_service_proxy_factory.go similarity index 100% rename from registry/service/proxy/metadata_service_proxy_factory.go rename to registry/servicediscovery/proxy/metadata_service_proxy_factory.go diff --git a/registry/service/proxy/service_proxy.go b/registry/servicediscovery/proxy/service_proxy.go similarity index 100% rename from registry/service/proxy/service_proxy.go rename to registry/servicediscovery/proxy/service_proxy.go diff --git a/registry/service/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go similarity index 96% rename from registry/service/service_discovery_registry.go rename to registry/servicediscovery/service_discovery_registry.go index b3c9e6a632..1734899f8b 100644 --- a/registry/service/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package service +package servicediscovery import ( "bytes" @@ -29,8 +29,8 @@ import ( "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/inmemory" "github.com/apache/dubbo-go/registry" - "github.com/apache/dubbo-go/registry/service/proxy" - "github.com/apache/dubbo-go/registry/service/synthesizer" + "github.com/apache/dubbo-go/registry/servicediscovery/proxy" + "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" "github.com/apache/dubbo-go/remoting" gxset "github.com/dubbogo/gost/container/set" "strconv" @@ -39,13 +39,19 @@ import ( ) const ( - SERVICE_REGISTRY_PROTOCOL = "service-discovery-registry" + protocolName = "service-discovery" ) func init() { - extension.SetRegistry(SERVICE_REGISTRY_PROTOCOL, newServiceDiscoveryRegistry) + extension.SetRegistry(protocolName, newServiceDiscoveryRegistry) } +// serviceDiscoveryRegistry is the implementation of application-level registry. +// It's completely different from other registry implementations +// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping +// In order to keep compatible with interface-level registry, +// 1. when we registry the service, we should create the mapping from service name to application name +// 2. when we sub type serviceDiscoveryRegistry struct { lock sync.RWMutex url *common.URL diff --git a/registry/service/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go similarity index 98% rename from registry/service/service_discovery_registry_test.go rename to registry/servicediscovery/service_discovery_registry_test.go index 41b681d991..90a8c0e257 100644 --- a/registry/service/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package service +package servicediscovery import ( "testing" diff --git a/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer.go b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go similarity index 96% rename from registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer.go rename to registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go index a7e7a5bc30..e09dfd6be7 100644 --- a/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer.go +++ b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go @@ -21,7 +21,7 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/registry" - "github.com/apache/dubbo-go/registry/service/synthesizer" + "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" "net/url" "strings" ) diff --git a/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go similarity index 100% rename from registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go rename to registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go diff --git a/registry/service/synthesizer/subscribed_urls_synthesizer.go b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go similarity index 100% rename from registry/service/synthesizer/subscribed_urls_synthesizer.go rename to registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go diff --git a/registry/service/synthesizer/subscribed_urls_synthesizer_factory.go b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go similarity index 100% rename from registry/service/synthesizer/subscribed_urls_synthesizer_factory.go rename to registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go