From 98ce5dc085dcedb87a304cb06e4d14ea4248c28e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 6 Aug 2024 12:38:10 +0200 Subject: [PATCH 1/4] add version to key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- ocis-pkg/natsjsregistry/registry.go | 4 ++-- ocis-pkg/natsjsregistry/watcher.go | 19 ++++++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index b66896f168b..1f5c3036ce0 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -95,7 +95,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO return err } return n.store.Write(&store.Record{ - Key: s.Name + _serviceDelimiter + server.DefaultId, + Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version, Value: b, Expiry: options.TTL, }) @@ -105,7 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { n.lock.RLock() defer n.lock.RUnlock() - return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId) + return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version) } // GetService gets a specific service from the registry diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go index 867d726261c..8ca81b2bf22 100644 --- a/ocis-pkg/natsjsregistry/watcher.go +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -3,6 +3,7 @@ package natsjsregistry import ( "encoding/json" "errors" + "strings" natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv" "github.com/nats-io/nats.go" @@ -48,9 +49,21 @@ func (w *Watcher) Next() (*registry.Result, error) { } var svc registry.Service - if err := json.Unmarshal(kve.Value.Data, &svc); err != nil { - _ = w.stop() - return nil, err + if kve.Value.Data == nil { + // fake a service + parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3) + if len(parts) != 3 { + return nil, errors.New("invalid service key") + } + svc.Name = parts[0] + // ocis registers nodes with a - seperator + svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}} + svc.Version = parts[2] + } else { + if err := json.Unmarshal(kve.Value.Data, &svc); err != nil { + _ = w.stop() + return nil, err + } } return ®istry.Result{ From 7834ad6705aa27ffcf644182c71bf269b86f6d20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 6 Aug 2024 12:38:50 +0200 Subject: [PATCH 2/4] aggregate services by versions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- ocis-pkg/natsjsregistry/registry.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index 1f5c3036ce0..78cc3b1b543 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -138,20 +138,28 @@ func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Serv return nil, err } - svcs := make([]*registry.Service, 0, len(keys)) + versions := map[string]*registry.Service{} for _, k := range keys { - s, err := n.getService(k) + s, err := n.getNode(k) if err != nil { // TODO: continue ? return nil, err } + if versions[s.Version] == nil { + versions[s.Version] = s + } else { + versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...) + } + } + svcs := make([]*registry.Service, 0, len(versions)) + for _, s := range versions { svcs = append(svcs, s) - } return svcs, nil } -func (n *storeregistry) getService(s string) (*registry.Service, error) { +// getNode retrieves a node from the store. It returns a service to also keep track of the version. +func (n *storeregistry) getNode(s string) (*registry.Service, error) { recs, err := n.store.Read(s) if err != nil { return nil, err From b356a9b21aa7df669e148c59c3634cb7f24d38ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 6 Aug 2024 12:42:36 +0200 Subject: [PATCH 3/4] update changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/fix-natsjskv-registry-2.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/changelog/unreleased/fix-natsjskv-registry-2.md b/changelog/unreleased/fix-natsjskv-registry-2.md index 4649c8376bf..d64f9f91131 100644 --- a/changelog/unreleased/fix-natsjskv-registry-2.md +++ b/changelog/unreleased/fix-natsjskv-registry-2.md @@ -1,6 +1,8 @@ Bugfix: Repair nats-js-kv registry The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it. +Internally, it can now distinguish services by version and will aggregate all nodes of the same version into a single service, as expected by the registry cache and watcher. +https://github.com/owncloud/ocis/pull/9734 https://github.com/owncloud/ocis/pull/9726 https://github.com/owncloud/ocis/pull/9656 From c3f170b7b617fadc5904359950aab84718c9b39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 6 Aug 2024 13:04:45 +0200 Subject: [PATCH 4/4] fix comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- ocis-pkg/natsjsregistry/watcher.go | 2 +- ocis-pkg/registry/service.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go index 8ca81b2bf22..9376c5ae42f 100644 --- a/ocis-pkg/natsjsregistry/watcher.go +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -56,7 +56,7 @@ func (w *Watcher) Next() (*registry.Result, error) { return nil, errors.New("invalid service key") } svc.Name = parts[0] - // ocis registers nodes with a - seperator + // ocis registers nodes with a - separator svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}} svc.Version = parts[2] } else { diff --git a/ocis-pkg/registry/service.go b/ocis-pkg/registry/service.go index 65c62e8d262..aebccc75202 100644 --- a/ocis-pkg/registry/service.go +++ b/ocis-pkg/registry/service.go @@ -65,6 +65,7 @@ func BuildHTTPService(serviceID, address string, version string) *mRegistry.Serv } node := &mRegistry.Node{ + // This id is read by the registry watcher Id: serviceID + "-" + server.DefaultId, Address: net.JoinHostPort(addr, fmt.Sprint(port)), Metadata: make(map[string]string),