diff --git a/common/constant/key.go b/common/constant/key.go index 4d2664baf9..3441b341d0 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -87,3 +87,15 @@ const ( ProviderConfigPrefix = "dubbo.provider." ConsumerConfigPrefix = "dubbo.consumer." ) + +const ( + NACOS_KEY = "nacos" + NACOS_DEFAULT_ROLETYPE = 3 + NACOS_CACHE_DIR_KEY = "cacheDir" + NACOS_LOG_DIR_KEY = "logDir" + NACOS_ENDPOINT = "endpoint" + NACOS_SERVICE_NAME_SEPARATOR = ":" + NACOS_CATEGORY_KEY = "category" + NACOS_PROTOCOL_KEY = "protocol" + NACOS_PATH_KEY = "path" +) diff --git a/config/reference_config.go b/config/reference_config.go index 67a4a3d816..f90e3aabd3 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -158,6 +158,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.GROUP_KEY, refconfig.Group) urlMap.Set(constant.VERSION_KEY, refconfig.Version) urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) //getty invoke async or sync urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) diff --git a/config/registry_config.go b/config/registry_config.go index 3d54c348aa..0abdab810f 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -36,9 +36,10 @@ type RegistryConfig struct { TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` //for registry - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } func (*RegistryConfig) Prefix() string { @@ -109,6 +110,8 @@ func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, regconfig.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, regconfig.TimeoutStr) - + for k, v := range regconfig.Params { + urlMap.Set(k, v) + } return urlMap } diff --git a/config/service_config.go b/config/service_config.go index 76913319f6..8b4a7d1b22 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -163,6 +163,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10)) urlMap.Set(constant.GROUP_KEY, srvconfig.Group) urlMap.Set(constant.VERSION_KEY, srvconfig.Version) + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) //application info urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name) urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization) diff --git a/go.mod b/go.mod index c25f0bd2d7..cc474591d6 100644 --- a/go.mod +++ b/go.mod @@ -2,15 +2,26 @@ module github.com/apache/dubbo-go require ( github.com/Workiva/go-datastructures v1.0.50 + github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 + github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect + github.com/go-errors/errors v1.0.1 // indirect github.com/golang/mock v1.3.1 + github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect + github.com/jonboulle/clockwork v0.1.0 // indirect + github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect + github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect + github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 + github.com/tebeka/strftime v0.1.3 // indirect + github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 39b2c6ea35..bad387e299 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,13 @@ github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= +github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= +github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= +github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= +github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 h1:D21IyuvjDCshj1/qq+pCNd3VZOAEI9jy6Bi131YlXgI= +github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -10,23 +16,69 @@ github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE= github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= +github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= +github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= +github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= +github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo= +github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE= +github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4= +github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8= +github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc= +github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI= +github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= +github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= +github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk= +github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -42,9 +94,16 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= +gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go new file mode 100644 index 0000000000..c42abd0bb1 --- /dev/null +++ b/registry/nacos/listener.go @@ -0,0 +1,199 @@ +package nacos + +import ( + "bytes" + "net/url" + "reflect" + "strconv" + "sync" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" +) + +type nacosListener struct { + namingClient naming_client.INamingClient + listenUrl common.URL + events chan *remoting.ConfigChangeEvent + instanceMap map[string]model.Instance + cacheLock sync.Mutex + done chan struct{} + subscribeParam *vo.SubscribeParam +} + +func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { + listener := &nacosListener{ + namingClient: namingClient, + listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), + instanceMap: map[string]model.Instance{}, + done: make(chan struct{}), + } + err := listener.startListen() + return listener, err +} + +func generateInstance(ss model.SubscribeService) model.Instance { + return model.Instance{ + InstanceId: ss.InstanceId, + Ip: ss.Ip, + Port: ss.Port, + ServiceName: ss.ServiceName, + Valid: ss.Valid, + Enable: ss.Enable, + Weight: ss.Weight, + Metadata: ss.Metadata, + ClusterName: ss.ClusterName, + } +} + +func generateUrl(instance model.Instance) *common.URL { + if instance.Metadata == nil { + logger.Errorf("nacos instance metadata is empty,instance:%+v", instance) + return nil + } + path := instance.Metadata["path"] + myInterface := instance.Metadata["interface"] + if len(path) == 0 && len(myInterface) == 0 { + logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance) + return nil + } + if len(path) == 0 && len(myInterface) != 0 { + path = "/" + myInterface + } + protocol := instance.Metadata["protocol"] + if len(protocol) == 0 { + logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance) + return nil + } + urlMap := url.Values{} + for k, v := range instance.Metadata { + urlMap.Set(k, v) + } + return common.NewURLWithOptions( + common.WithIp(instance.Ip), + common.WithPort(strconv.Itoa(int(instance.Port))), + common.WithProtocol(protocol), + common.WithParams(urlMap), + common.WithPath(path), + ) +} + +func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { + if err != nil { + logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) + return + } + nl.cacheLock.Lock() + defer nl.cacheLock.Unlock() + addInstances := make([]model.Instance, 0, len(services)) + delInstances := make([]model.Instance, 0, len(services)) + updateInstances := make([]model.Instance, 0, len(services)) + + newInstanceMap := make(map[string]model.Instance, len(services)) + + for i := range services { + if !services[i].Enable || !services[i].Valid { + // instance is not available,so ignore it + continue + } + host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port)) + instance := generateInstance(services[i]) + newInstanceMap[host] = instance + if old, ok := nl.instanceMap[host]; !ok { + //instance is not exsit in cache,add it to cache + addInstances = append(addInstances, instance) + } else { + //instance is not different from cache,update it to cache + if !reflect.DeepEqual(old, instance) { + updateInstances = append(updateInstances, instance) + } + } + } + + for host, inst := range nl.instanceMap { + if _, ok := newInstanceMap[host]; !ok { + //cache instance is not exsit in new instance list, remove it from cache + delInstances = append(delInstances, inst) + } + } + + nl.instanceMap = newInstanceMap + + for i := range addInstances { + newUrl := generateUrl(addInstances[i]) + if newUrl != nil { + nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd}) + } + } + for i := range delInstances { + newUrl := generateUrl(delInstances[i]) + if newUrl != nil { + nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel}) + } + } + + for i := range updateInstances { + newUrl := generateUrl(updateInstances[i]) + if newUrl != nil { + nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate}) + } + } +} + +func getSubscribeName(url common.URL) string { + var buffer bytes.Buffer + + buffer.Write([]byte(common.DubboNodes[common.PROVIDER])) + appendParam(&buffer, url, constant.INTERFACE_KEY) + appendParam(&buffer, url, constant.VERSION_KEY) + appendParam(&buffer, url, constant.GROUP_KEY) + return buffer.String() +} + +func (nl *nacosListener) startListen() error { + if nl.namingClient == nil { + return perrors.New("nacos naming client stopped") + } + serviceName := getSubscribeName(nl.listenUrl) + nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback} + return nl.namingClient.Subscribe(nl.subscribeParam) +} + +func (nl *nacosListener) stopListen() error { + return nl.namingClient.Unsubscribe(nl.subscribeParam) +} + +func (nl *nacosListener) process(configType *remoting.ConfigChangeEvent) { + nl.events <- configType +} + +func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { + for { + select { + case <-nl.done: + logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl) + return nil, perrors.New("listener stopped") + + case e := <-nl.events: + logger.Debugf("got nacos event %s", e) + return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil + } + } +} + +func (nl *nacosListener) Close() { + nl.stopListen() + close(nl.done) +} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go new file mode 100644 index 0000000000..f10e230bc4 --- /dev/null +++ b/registry/nacos/registry.go @@ -0,0 +1,176 @@ +package nacos + +import ( + "bytes" + "net" + "strconv" + "strings" + "time" +) +import ( + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/utils" + "github.com/apache/dubbo-go/registry" +) + +var ( + localIP = "" +) + +func init() { + localIP, _ = utils.GetLocalIP() + extension.SetRegistry(constant.NACOS_KEY, newNacosRegistry) +} + +type nacosRegistry struct { + *common.URL + namingClient naming_client.INamingClient +} + +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if len(url.Location) == 0 { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}, 2) + + addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} + +func newNacosRegistry(url *common.URL) (registry.Registry, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return nil, err + } + client, err := clients.CreateNamingClient(nacosConfig) + if err != nil { + return nil, err + } + registry := nacosRegistry{ + URL: url, + namingClient: client, + } + return ®istry, nil +} + +func getCategory(url common.URL) string { + role, _ := strconv.Atoi(url.GetParam(constant.ROLE_KEY, strconv.Itoa(constant.NACOS_DEFAULT_ROLETYPE))) + category := common.DubboNodes[role] + return category +} + +func getServiceName(url common.URL) string { + var buffer bytes.Buffer + + buffer.Write([]byte(getCategory(url))) + appendParam(&buffer, url, constant.INTERFACE_KEY) + appendParam(&buffer, url, constant.VERSION_KEY) + appendParam(&buffer, url, constant.GROUP_KEY) + return buffer.String() +} + +func appendParam(target *bytes.Buffer, url common.URL, key string) { + value := url.GetParam(key, "") + if strings.TrimSpace(value) != "" { + target.Write([]byte(constant.NACOS_SERVICE_NAME_SEPARATOR)) + target.Write([]byte(value)) + } +} + +func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam { + category := getCategory(url) + params := make(map[string]string, len(url.Params)+3) + for k, _ := range url.Params { + params[k] = url.Params.Get(k) + } + params[constant.NACOS_CATEGORY_KEY] = category + params[constant.NACOS_PROTOCOL_KEY] = url.Protocol + params[constant.NACOS_PATH_KEY] = url.Path + if len(url.Ip) == 0 { + url.Ip = localIP + } + if len(url.Port) == 0 || url.Port == "0" { + url.Port = "80" + } + port, _ := strconv.Atoi(url.Port) + instance := vo.RegisterInstanceParam{ + Ip: url.Ip, + Port: uint64(port), + Metadata: params, + Weight: 1, + Enable: true, + Healthy: true, + Ephemeral: true, + ServiceName: serviceName, + } + return instance +} + +func (nr *nacosRegistry) Register(url common.URL) error { + serviceName := getServiceName(url) + param := createRegisterParam(url, serviceName) + isRegistry, err := nr.namingClient.RegisterInstance(param) + if err != nil { + return err + } + if !isRegistry { + return perrors.New("registry [" + serviceName + "] to nacos failed") + } + return nil +} + +func (nr *nacosRegistry) Subscribe(conf common.URL) (registry.Listener, error) { + return NewNacosListener(conf, nr.namingClient) +} + +func (nr *nacosRegistry) GetUrl() common.URL { + return *nr.URL +} + +func (nr *nacosRegistry) IsAvailable() bool { + return true +} + +func (nr *nacosRegistry) Destroy() { + return +} diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go new file mode 100644 index 0000000000..97dea0d2b0 --- /dev/null +++ b/registry/nacos/registry_test.go @@ -0,0 +1,174 @@ +package nacos + +import ( + "context" + "encoding/json" + "net/url" + "strconv" + "testing" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func Test_Register(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg, err := newNacosRegistry(®url) + assert.Nil(t, err) + if err != nil { + t.Errorf("new nacos registry error:%s \n", err.Error()) + return + } + err = reg.Register(url) + assert.Nil(t, err) + if err != nil { + t.Errorf("register error:%s \n", err.Error()) + return + } + nacosReg := reg.(*nacosRegistry) + service, _ := nacosReg.namingClient.GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"}) + data, _ := json.Marshal(service) + t.Logf(string(data)) + assert.Equal(t, 1, len(service.Hosts)) +} + +func TestNacosRegistry_Subscribe(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + urlMap.Set(constant.NACOS_PATH_KEY, "") + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg, _ := newNacosRegistry(®url) + err := reg.Register(url) + assert.Nil(t, err) + if err != nil { + t.Errorf("new nacos registry error:%s \n", err.Error()) + return + } + + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + reg2, _ := newNacosRegistry(®url) + listener, err := reg2.Subscribe(url) + assert.Nil(t, err) + if err != nil { + t.Errorf("subscribe error:%s \n", err.Error()) + return + } + serviceEvent, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + t.Errorf("listener error:%s \n", err.Error()) + return + } + t.Logf("serviceEvent:%+v \n", serviceEvent) + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) + +} + +func TestNacosRegistry_Subscribe_del(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + urlMap.Set(constant.NACOS_PATH_KEY, "") + url1, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + url2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.2:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg, _ := newNacosRegistry(®url) + err := reg.Register(url1) + assert.Nil(t, err) + if err != nil { + t.Errorf("register1 error:%s \n", err.Error()) + return + } + err = reg.Register(url2) + assert.Nil(t, err) + if err != nil { + t.Errorf("register2 error:%s \n", err.Error()) + return + } + + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + reg2, _ := newNacosRegistry(®url) + listener, err := reg2.Subscribe(url1) + assert.Nil(t, err) + if err != nil { + t.Errorf("subscribe error:%s \n", err.Error()) + return + } + + serviceEvent1, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + t.Errorf("listener1 error:%s \n", err.Error()) + return + } + t.Logf("serviceEvent1:%+v \n", serviceEvent1) + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent1.String()) + + serviceEvent2, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + t.Errorf("listener2 error:%s \n", err.Error()) + return + } + t.Logf("serviceEvent2:%+v \n", serviceEvent2) + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent2.String()) + + nacosReg := reg.(*nacosRegistry) + //deregister instance to mock instance offline + nacosReg.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{Ip: "127.0.0.2", Port: 20000, ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"}) + + serviceEvent3, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + return + } + t.Logf("serviceEvent3:%+v \n", serviceEvent3) + assert.Regexp(t, ".*ServiceEvent{Action{delete}.*", serviceEvent3.String()) +} + +func TestNacosListener_Close(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider2") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + urlMap.Set(constant.NACOS_PATH_KEY, "") + url1, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider2", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + reg, _ := newNacosRegistry(®url) + listener, err := reg.Subscribe(url1) + assert.Nil(t, err) + if err != nil { + t.Errorf("subscribe error:%s \n", err.Error()) + return + } + listener.Close() + _, err = listener.Next() + assert.NotNil(t, err) +} diff --git a/remoting/listener.go b/remoting/listener.go index da30f6989d..fd566f3538 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -52,6 +52,7 @@ const ( var serviceEventTypeStrings = [...]string{ "add", "delete", + "update", } func (t EventType) String() string {