From 8bdc29f70f45bb06c1f3dc5693f5fb7c881cb80d Mon Sep 17 00:00:00 2001 From: lzp0412 <641785844@qq.com> Date: Sat, 3 Aug 2019 21:47:15 +0800 Subject: [PATCH 1/5] support nacos registry --- common/constant/key.go | 11 ++ config/reference_config.go | 1 + config/registry_config.go | 11 +- config/service_config.go | 1 + go.mod | 13 +++ go.sum | 61 +++++++++++ registry/nacos/listener.go | 188 ++++++++++++++++++++++++++++++++ registry/nacos/registry.go | 176 ++++++++++++++++++++++++++++++ registry/nacos/registry_test.go | 174 +++++++++++++++++++++++++++++ remoting/listener.go | 1 + 10 files changed, 633 insertions(+), 4 deletions(-) create mode 100644 registry/nacos/listener.go create mode 100644 registry/nacos/registry.go create mode 100644 registry/nacos/registry_test.go diff --git a/common/constant/key.go b/common/constant/key.go index bca658b262..b7fc4ad846 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -84,3 +84,14 @@ const ( ProviderConfigPrefix = "dubbo.provider." ConsumerConfigPrefix = "dubbo.consumer." ) + +const ( + NACOS_DEFAULT_ROLETYPE = 3 + NACOS_CACHE_DIR_KEY = "cacheDir" + NACOS_LOG_DIR_KEY = "logDir" + NACOS_ENDPOINT = "endpoint" + NACOS_SERVICE_NAME_SEPARATOR = ":" + NACOS_CATEGORY_KEY = "category" + NACOS_PROTOCOL_KEY = "protocol" + NACOS_PATH_KEY = "path" +) diff --git a/config/reference_config.go b/config/reference_config.go index a5b7d50db5..f0c163adfd 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -157,6 +157,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10)) urlMap.Set(constant.GROUP_KEY, refconfig.Group) urlMap.Set(constant.VERSION_KEY, refconfig.Version) + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) //getty invoke async or sync urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) diff --git a/config/registry_config.go b/config/registry_config.go index 3d54c348aa..0abdab810f 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -36,9 +36,10 @@ type RegistryConfig struct { TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second Group string `yaml:"group" json:"group,omitempty" property:"group"` //for registry - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } func (*RegistryConfig) Prefix() string { @@ -109,6 +110,8 @@ func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, regconfig.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, regconfig.TimeoutStr) - + for k, v := range regconfig.Params { + urlMap.Set(k, v) + } return urlMap } diff --git a/config/service_config.go b/config/service_config.go index 76913319f6..8b4a7d1b22 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -163,6 +163,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10)) urlMap.Set(constant.GROUP_KEY, srvconfig.Group) urlMap.Set(constant.VERSION_KEY, srvconfig.Version) + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) //application info urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name) urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization) diff --git a/go.mod b/go.mod index 61ce25b5cd..fb5c5118a8 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,26 @@ module github.com/apache/dubbo-go require ( + github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 + github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 + github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect + github.com/go-errors/errors v1.0.1 // indirect + github.com/golang/mock v1.3.1 // indirect + github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect + github.com/jonboulle/clockwork v0.1.0 // indirect + github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect + github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect + github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 + github.com/tebeka/strftime v0.1.3 // indirect + github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 7718fce852..e7ca641c0c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,11 @@ +github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= +github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= +github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= +github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= +github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 h1:D21IyuvjDCshj1/qq+pCNd3VZOAEI9jy6Bi131YlXgI= +github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -8,21 +14,67 @@ github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE= github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= +github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= +github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= +github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= +github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo= +github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE= +github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4= +github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8= +github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc= +github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI= +github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= +github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= +github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= +github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk= +github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -30,13 +82,22 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= +gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go new file mode 100644 index 0000000000..2d851ecf63 --- /dev/null +++ b/registry/nacos/listener.go @@ -0,0 +1,188 @@ +package nacos + +import ( + "bytes" + "net/url" + "reflect" + "strconv" + "sync" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" +) + +type nacosListener struct { + sync.Mutex + namingClient naming_client.INamingClient + listenUrl common.URL + events chan *remoting.ConfigChangeEvent + hostMapInstance map[string]model.Instance + done chan struct{} + subscribeParam *vo.SubscribeParam +} + +func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { + listener := &nacosListener{namingClient: namingClient, listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), hostMapInstance: map[string]model.Instance{}, done: make(chan struct{})} + err := listener.startListen() + return listener, err +} + +func generateInstance(ss model.SubscribeService) model.Instance { + return model.Instance{ + InstanceId: ss.InstanceId, + Ip: ss.Ip, + Port: ss.Port, + ServiceName: ss.ServiceName, + Valid: ss.Valid, + Enable: ss.Enable, + Weight: ss.Weight, + Metadata: ss.Metadata, + ClusterName: ss.ClusterName, + } +} + +func generateUrl(instance model.Instance) *common.URL { + if instance.Metadata == nil { + logger.Errorf("nacos instance metadata is empty,instance:%+v", instance) + return nil + } + path := instance.Metadata["path"] + myInterface := instance.Metadata["interface"] + if path == "" && myInterface == "" { + logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance) + return nil + } + if path == "" && myInterface != "" { + path = "/" + myInterface + } + protocol := instance.Metadata["protocol"] + if protocol == "" { + logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance) + return nil + } + urlMap := url.Values{} + for k, v := range instance.Metadata { + urlMap.Set(k, v) + } + return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path)) +} + +func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { + if err != nil { + logger.Errorf("nacos subscribe callback error:%s ", err.Error()) + return + } + nl.Lock() + defer nl.Unlock() + var addInstances []model.Instance + var delInstances []model.Instance + var updateInstances []model.Instance + + newInstanceMap := map[string]model.Instance{} + + for _, s := range services { + if !s.Enable || !s.Valid { + //实例不可以用 + continue + } + host := s.Ip + ":" + strconv.Itoa(int(s.Port)) + instance := generateInstance(s) + newInstanceMap[host] = instance + if old, ok := nl.hostMapInstance[host]; !ok { + //新增实例节点 + addInstances = append(addInstances, instance) + } else { + //实例更新 + if !reflect.DeepEqual(old, instance) { + updateInstances = append(updateInstances, instance) + } + } + } + + //判断旧的实例是否在新实例列表中,不存在则代表实例已下线 + for host, inst := range nl.hostMapInstance { + if _, ok := newInstanceMap[host]; !ok { + delInstances = append(delInstances, inst) + } + } + + nl.hostMapInstance = newInstanceMap + + for _, add := range addInstances { + newUrl := generateUrl(add) + if newUrl != nil { + nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd}) + } + } + for _, del := range delInstances { + newUrl := generateUrl(del) + if newUrl != nil { + nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel}) + } + } + + for _, update := range updateInstances { + newUrl := generateUrl(update) + if newUrl != nil { + nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate}) + } + } +} + +func getSubscribeName(url common.URL) string { + var buffer bytes.Buffer + + buffer.Write([]byte(common.DubboNodes[common.PROVIDER])) + appendParam(&buffer, url, constant.INTERFACE_KEY) + appendParam(&buffer, url, constant.VERSION_KEY) + appendParam(&buffer, url, constant.GROUP_KEY) + return buffer.String() +} + +func (nl *nacosListener) startListen() error { + if nl.namingClient == nil { + return perrors.New("nacos naming client stopped") + } + serviceName := getSubscribeName(nl.listenUrl) + nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback} + return nl.namingClient.Subscribe(nl.subscribeParam) +} + +func (nl *nacosListener) stopListen() error { + return nl.namingClient.Unsubscribe(nl.subscribeParam) +} + +func (nl *nacosListener) process(configType *remoting.ConfigChangeEvent) { + nl.events <- configType +} + +func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { + for { + select { + case <-nl.done: + logger.Warnf("nacos listener is close!") + return nil, perrors.New("listener stopped") + + case e := <-nl.events: + logger.Debugf("got nacos event %s", e) + return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil + } + } +} + +func (nl *nacosListener) Close() { + nl.stopListen() + close(nl.done) +} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go new file mode 100644 index 0000000000..73656e25a0 --- /dev/null +++ b/registry/nacos/registry.go @@ -0,0 +1,176 @@ +package nacos + +import ( + "bytes" + "net" + "strconv" + "strings" + "time" +) +import ( + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/utils" + "github.com/apache/dubbo-go/registry" +) + +var ( + localIP = "" +) + +func init() { + localIP, _ = utils.GetLocalIP() + extension.SetRegistry("nacos", newNacosRegistry) +} + +type nacosRegistry struct { + *common.URL + namingClient naming_client.INamingClient +} + +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if url.Location == "" { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}) + + var serverConfigs []nacosConstant.ServerConfig + addresses := strings.Split(url.Location, ",") + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} + +func newNacosRegistry(url *common.URL) (registry.Registry, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return nil, err + } + client, err := clients.CreateNamingClient(nacosConfig) + if err != nil { + return nil, err + } + registry := nacosRegistry{ + URL: url, + namingClient: client, + } + return ®istry, nil +} + +func getCategory(url common.URL) string { + role, _ := strconv.Atoi(url.GetParam(constant.ROLE_KEY, strconv.Itoa(constant.NACOS_DEFAULT_ROLETYPE))) + category := common.DubboNodes[role] + return category +} + +func getServiceName(url common.URL) string { + var buffer bytes.Buffer + + buffer.Write([]byte(getCategory(url))) + appendParam(&buffer, url, constant.INTERFACE_KEY) + appendParam(&buffer, url, constant.VERSION_KEY) + appendParam(&buffer, url, constant.GROUP_KEY) + return buffer.String() +} + +func appendParam(target *bytes.Buffer, url common.URL, key string) { + value := url.GetParam(key, "") + if strings.TrimSpace(value) != "" { + target.Write([]byte(constant.NACOS_SERVICE_NAME_SEPARATOR)) + target.Write([]byte(value)) + } +} + +func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam { + category := getCategory(url) + params := map[string]string{} + for k, _ := range url.Params { + params[k] = url.Params.Get(k) + } + params[constant.NACOS_CATEGORY_KEY] = category + params[constant.NACOS_PROTOCOL_KEY] = url.Protocol + params[constant.NACOS_PATH_KEY] = url.Path + if url.Ip == "" { + url.Ip = localIP + } + if url.Port == "" || url.Port == "0" { + url.Port = "80" + } + port, _ := strconv.Atoi(url.Port) + instance := vo.RegisterInstanceParam{ + Ip: url.Ip, + Port: uint64(port), + Metadata: params, + Weight: 1, + Enable: true, + Healthy: true, + Ephemeral: true, + ServiceName: serviceName, + } + return instance +} + +func (nr *nacosRegistry) Register(url common.URL) error { + serviceName := getServiceName(url) + param := createRegisterParam(url, serviceName) + isRegistry, err := nr.namingClient.RegisterInstance(param) + if err != nil { + return err + } + if !isRegistry { + return perrors.New("registry to nacos failed") + } + return nil +} + +func (nr *nacosRegistry) Subscribe(conf common.URL) (registry.Listener, error) { + return NewNacosListener(conf, nr.namingClient) +} + +func (nr *nacosRegistry) GetUrl() common.URL { + return *nr.URL +} + +func (nr *nacosRegistry) IsAvailable() bool { + return true +} + +func (nr *nacosRegistry) Destroy() { + return +} diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go new file mode 100644 index 0000000000..dfc4e6e515 --- /dev/null +++ b/registry/nacos/registry_test.go @@ -0,0 +1,174 @@ +package nacos + +import ( + "context" + "encoding/json" + "net/url" + "strconv" + "testing" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func Test_Register(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg, err := newNacosRegistry(®url) + assert.Nil(t, err) + if err != nil { + t.Errorf("new nacos registry error:%s \n", err.Error()) + return + } + err = reg.Register(url) + assert.Nil(t, err) + if err != nil { + t.Errorf("register error:%s \n", err.Error()) + return + } + nacosReg := reg.(*nacosRegistry) + service, _ := nacosReg.namingClient.GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"}) + data, _ := json.Marshal(service) + t.Logf(string(data)) + assert.Equal(t, 1, len(service.Hosts)) +} + +func TestNacosRegistry_Subscribe(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + urlMap.Set(constant.NACOS_PATH_KEY, "") + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg, _ := newNacosRegistry(®url) + err := reg.Register(url) + assert.Nil(t, err) + if err != nil { + t.Errorf("new nacos registry error:%s \n", err.Error()) + return + } + + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + reg2, _ := newNacosRegistry(®url) + listener, err := reg2.Subscribe(url) + assert.Nil(t, err) + if err != nil { + t.Errorf("subscribe error:%s \n", err.Error()) + return + } + serviceEvent, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + t.Errorf("listener error:%s \n", err.Error()) + return + } + t.Logf("serviceEvent:%+v \n", serviceEvent) + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) + +} + +func TestNacosRegistry_Subscribe_del(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + urlMap.Set(constant.NACOS_PATH_KEY, "") + url1, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + url2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.2:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + + reg, _ := newNacosRegistry(®url) + err := reg.Register(url1) + assert.Nil(t, err) + if err != nil { + t.Errorf("register1 error:%s \n", err.Error()) + return + } + err = reg.Register(url2) + assert.Nil(t, err) + if err != nil { + t.Errorf("register2 error:%s \n", err.Error()) + return + } + + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + reg2, _ := newNacosRegistry(®url) + listener, err := reg2.Subscribe(url1) + assert.Nil(t, err) + if err != nil { + t.Errorf("subscribe error:%s \n", err.Error()) + return + } + + serviceEvent1, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + t.Errorf("listener1 error:%s \n", err.Error()) + return + } + t.Logf("serviceEvent1:%+v \n", serviceEvent1) + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent1.String()) + + serviceEvent2, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + t.Errorf("listener2 error:%s \n", err.Error()) + return + } + t.Logf("serviceEvent2:%+v \n", serviceEvent2) + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent2.String()) + + nacosReg := reg.(*nacosRegistry) + //手动注销实例 + nacosReg.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{Ip: "127.0.0.2", Port: 20000, ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"}) + + serviceEvent3, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + return + } + t.Logf("serviceEvent3:%+v \n", serviceEvent3) + assert.Regexp(t, ".*ServiceEvent{Action{delete}.*", serviceEvent3.String()) +} + +func TestNacosListener_Close(t *testing.T) { + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + urlMap := url.Values{} + urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") + urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider2") + urlMap.Set(constant.VERSION_KEY, "1.0.0") + urlMap.Set(constant.CLUSTER_KEY, "mock") + urlMap.Set(constant.NACOS_PATH_KEY, "") + url1, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider2", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"})) + reg, _ := newNacosRegistry(®url) + listener, err := reg.Subscribe(url1) + assert.Nil(t, err) + if err != nil { + t.Errorf("subscribe error:%s \n", err.Error()) + return + } + listener.Close() + _, err = listener.Next() + assert.NotNil(t, err) +} diff --git a/remoting/listener.go b/remoting/listener.go index da30f6989d..fd566f3538 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -52,6 +52,7 @@ const ( var serviceEventTypeStrings = [...]string{ "add", "delete", + "update", } func (t EventType) String() string { From 5d51d2a0c531ba1af062f2176982a1ea6e30fe1a Mon Sep 17 00:00:00 2001 From: lizhipeng1 Date: Sat, 10 Aug 2019 20:37:00 +0800 Subject: [PATCH 2/5] coding standard --- common/constant/key.go | 1 + registry/nacos/listener.go | 62 +++++++++++++++++++++----------------- registry/nacos/registry.go | 16 +++++----- 3 files changed, 43 insertions(+), 36 deletions(-) diff --git a/common/constant/key.go b/common/constant/key.go index b7fc4ad846..24bd2d20c3 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -86,6 +86,7 @@ const ( ) const ( + NACOS_KEY = "nacos" NACOS_DEFAULT_ROLETYPE = 3 NACOS_CACHE_DIR_KEY = "cacheDir" NACOS_LOG_DIR_KEY = "logDir" diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 2d851ecf63..b45c3bed58 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -24,17 +24,22 @@ import ( ) type nacosListener struct { - sync.Mutex namingClient naming_client.INamingClient listenUrl common.URL events chan *remoting.ConfigChangeEvent hostMapInstance map[string]model.Instance + cacheLock sync.Mutex done chan struct{} subscribeParam *vo.SubscribeParam } func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { - listener := &nacosListener{namingClient: namingClient, listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), hostMapInstance: map[string]model.Instance{}, done: make(chan struct{})} + listener := &nacosListener{ + namingClient: namingClient, + listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), + hostMapInstance: map[string]model.Instance{}, + done: make(chan struct{}), + } err := listener.startListen() return listener, err } @@ -60,15 +65,15 @@ func generateUrl(instance model.Instance) *common.URL { } path := instance.Metadata["path"] myInterface := instance.Metadata["interface"] - if path == "" && myInterface == "" { + if len(path) == 0 && len(myInterface) == 0 { logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance) return nil } - if path == "" && myInterface != "" { + if len(path) == 0 && len(myInterface) != 0 { path = "/" + myInterface } protocol := instance.Metadata["protocol"] - if protocol == "" { + if len(protocol) == 0 { logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance) return nil } @@ -76,65 +81,66 @@ func generateUrl(instance model.Instance) *common.URL { for k, v := range instance.Metadata { urlMap.Set(k, v) } - return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path)) + return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), + common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path)) } func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { if err != nil { - logger.Errorf("nacos subscribe callback error:%s ", err.Error()) + logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) return } - nl.Lock() - defer nl.Unlock() - var addInstances []model.Instance - var delInstances []model.Instance - var updateInstances []model.Instance + nl.cacheLock.Lock() + defer nl.cacheLock.Unlock() + addInstances := make([]model.Instance, 0, len(services)) + delInstances := make([]model.Instance, 0, len(services)) + updateInstances := make([]model.Instance, 0, len(services)) - newInstanceMap := map[string]model.Instance{} + newInstanceMap := make(map[string]model.Instance, len(services)) - for _, s := range services { - if !s.Enable || !s.Valid { - //实例不可以用 + for i := range services { + if !services[i].Enable || !services[i].Valid { + // instance is not available,so ignore it continue } - host := s.Ip + ":" + strconv.Itoa(int(s.Port)) - instance := generateInstance(s) + host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port)) + instance := generateInstance(services[i]) newInstanceMap[host] = instance if old, ok := nl.hostMapInstance[host]; !ok { - //新增实例节点 + //instance is not exsit in cache,add it to cache addInstances = append(addInstances, instance) } else { - //实例更新 + //instance is not different from cache,update it to cache if !reflect.DeepEqual(old, instance) { updateInstances = append(updateInstances, instance) } } } - //判断旧的实例是否在新实例列表中,不存在则代表实例已下线 for host, inst := range nl.hostMapInstance { if _, ok := newInstanceMap[host]; !ok { + //cache instance is not exsit in new instance list, remove it from cache delInstances = append(delInstances, inst) } } nl.hostMapInstance = newInstanceMap - for _, add := range addInstances { - newUrl := generateUrl(add) + for i := range addInstances { + newUrl := generateUrl(addInstances[i]) if newUrl != nil { nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd}) } } - for _, del := range delInstances { - newUrl := generateUrl(del) + for i := range delInstances { + newUrl := generateUrl(delInstances[i]) if newUrl != nil { nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel}) } } - for _, update := range updateInstances { - newUrl := generateUrl(update) + for i := range updateInstances { + newUrl := generateUrl(updateInstances[i]) if newUrl != nil { nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate}) } @@ -172,7 +178,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { for { select { case <-nl.done: - logger.Warnf("nacos listener is close!") + logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl) return nil, perrors.New("listener stopped") case e := <-nl.events: diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 73656e25a0..f10e230bc4 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -29,7 +29,7 @@ var ( func init() { localIP, _ = utils.GetLocalIP() - extension.SetRegistry("nacos", newNacosRegistry) + extension.SetRegistry(constant.NACOS_KEY, newNacosRegistry) } type nacosRegistry struct { @@ -41,13 +41,13 @@ func getNacosConfig(url *common.URL) (map[string]interface{}, error) { if url == nil { return nil, perrors.New("url is empty!") } - if url.Location == "" { + if len(url.Location) == 0 { return nil, perrors.New("url.location is empty!") } - configMap := make(map[string]interface{}) + configMap := make(map[string]interface{}, 2) - var serverConfigs []nacosConstant.ServerConfig addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) for _, addr := range addresses { ip, portStr, err := net.SplitHostPort(addr) if err != nil { @@ -119,17 +119,17 @@ func appendParam(target *bytes.Buffer, url common.URL, key string) { func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam { category := getCategory(url) - params := map[string]string{} + params := make(map[string]string, len(url.Params)+3) for k, _ := range url.Params { params[k] = url.Params.Get(k) } params[constant.NACOS_CATEGORY_KEY] = category params[constant.NACOS_PROTOCOL_KEY] = url.Protocol params[constant.NACOS_PATH_KEY] = url.Path - if url.Ip == "" { + if len(url.Ip) == 0 { url.Ip = localIP } - if url.Port == "" || url.Port == "0" { + if len(url.Port) == 0 || url.Port == "0" { url.Port = "80" } port, _ := strconv.Atoi(url.Port) @@ -154,7 +154,7 @@ func (nr *nacosRegistry) Register(url common.URL) error { return err } if !isRegistry { - return perrors.New("registry to nacos failed") + return perrors.New("registry [" + serviceName + "] to nacos failed") } return nil } From b27dac4d91aeeac90496b3ac98d812ceb63ebe4b Mon Sep 17 00:00:00 2001 From: lzp0412 <641785844@qq.com> Date: Sat, 10 Aug 2019 21:15:47 +0800 Subject: [PATCH 3/5] coding standard --- registry/nacos/listener.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index b45c3bed58..4096856def 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -81,8 +81,13 @@ func generateUrl(instance model.Instance) *common.URL { for k, v := range instance.Metadata { urlMap.Set(k, v) } - return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), - common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path)) + return common.NewURLWithOptions( + common.WithIp(instance.Ip), + common.WithPort(strconv.Itoa(int(instance.Port))), + common.WithProtocol(protocol), + common.WithParams(urlMap), + common.WithPath(path), + ) } func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { From 7f87ea6f61a8c5da057cfbcf5966b77f7f423d44 Mon Sep 17 00:00:00 2001 From: lzp0412 <641785844@qq.com> Date: Mon, 12 Aug 2019 11:02:01 +0800 Subject: [PATCH 4/5] coding standard --- registry/nacos/listener.go | 24 ++++++++++++------------ registry/nacos/registry_test.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 4096856def..c42abd0bb1 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -24,21 +24,21 @@ import ( ) type nacosListener struct { - namingClient naming_client.INamingClient - listenUrl common.URL - events chan *remoting.ConfigChangeEvent - hostMapInstance map[string]model.Instance - cacheLock sync.Mutex - done chan struct{} - subscribeParam *vo.SubscribeParam + namingClient naming_client.INamingClient + listenUrl common.URL + events chan *remoting.ConfigChangeEvent + instanceMap map[string]model.Instance + cacheLock sync.Mutex + done chan struct{} + subscribeParam *vo.SubscribeParam } func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { listener := &nacosListener{ namingClient: namingClient, listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), - hostMapInstance: map[string]model.Instance{}, - done: make(chan struct{}), + instanceMap: map[string]model.Instance{}, + done: make(chan struct{}), } err := listener.startListen() return listener, err @@ -111,7 +111,7 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error) host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port)) instance := generateInstance(services[i]) newInstanceMap[host] = instance - if old, ok := nl.hostMapInstance[host]; !ok { + if old, ok := nl.instanceMap[host]; !ok { //instance is not exsit in cache,add it to cache addInstances = append(addInstances, instance) } else { @@ -122,14 +122,14 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error) } } - for host, inst := range nl.hostMapInstance { + for host, inst := range nl.instanceMap { if _, ok := newInstanceMap[host]; !ok { //cache instance is not exsit in new instance list, remove it from cache delInstances = append(delInstances, inst) } } - nl.hostMapInstance = newInstanceMap + nl.instanceMap = newInstanceMap for i := range addInstances { newUrl := generateUrl(addInstances[i]) diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go index dfc4e6e515..97dea0d2b0 100644 --- a/registry/nacos/registry_test.go +++ b/registry/nacos/registry_test.go @@ -139,7 +139,7 @@ func TestNacosRegistry_Subscribe_del(t *testing.T) { assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent2.String()) nacosReg := reg.(*nacosRegistry) - //手动注销实例 + //deregister instance to mock instance offline nacosReg.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{Ip: "127.0.0.2", Port: 20000, ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"}) serviceEvent3, _ := listener.Next() From d448aa885ed7effd9c20aae8ef3ddec03d49fb3f Mon Sep 17 00:00:00 2001 From: lzp0412 <641785844@qq.com> Date: Mon, 12 Aug 2019 12:48:56 +0800 Subject: [PATCH 5/5] add depandency to go.mod --- go.mod | 4 +--- go.sum | 8 ++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 689a929fa4..cc474591d6 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,12 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 - github.com/davecgh/go-spew v1.1.1 // indirect github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 - github.com/golang/mock v1.3.1 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-errors/errors v1.0.1 // indirect - github.com/golang/mock v1.3.1 // indirect + github.com/golang/mock v1.3.1 github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect github.com/jonboulle/clockwork v0.1.0 // indirect github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect diff --git a/go.sum b/go.sum index 1022eac567..bad387e299 100644 --- a/go.sum +++ b/go.sum @@ -16,14 +16,14 @@ github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE= github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= -github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= -github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -94,10 +94,10 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= -golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=