diff --git a/changelog/unreleased/fix-natsjskv-registry-2.md b/changelog/unreleased/fix-natsjskv-registry-2.md index 4649c8376bf..d64f9f91131 100644 --- a/changelog/unreleased/fix-natsjskv-registry-2.md +++ b/changelog/unreleased/fix-natsjskv-registry-2.md @@ -1,6 +1,8 @@ Bugfix: Repair nats-js-kv registry The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it. +Internally, it can now distinguish services by version and will aggregate all nodes of the same version into a single service, as expected by the registry cache and watcher. +https://github.com/owncloud/ocis/pull/9734 https://github.com/owncloud/ocis/pull/9726 https://github.com/owncloud/ocis/pull/9656 diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index b66896f168b..78cc3b1b543 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -95,7 +95,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO return err } return n.store.Write(&store.Record{ - Key: s.Name + _serviceDelimiter + server.DefaultId, + Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version, Value: b, Expiry: options.TTL, }) @@ -105,7 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { n.lock.RLock() defer n.lock.RUnlock() - return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId) + return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version) } // GetService gets a specific service from the registry @@ -138,20 +138,28 @@ func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Serv return nil, err } - svcs := make([]*registry.Service, 0, len(keys)) + versions := map[string]*registry.Service{} for _, k := range keys { - s, err := n.getService(k) + s, err := n.getNode(k) if err != nil { // TODO: continue ? return nil, err } + if versions[s.Version] == nil { + versions[s.Version] = s + } else { + versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...) + } + } + svcs := make([]*registry.Service, 0, len(versions)) + for _, s := range versions { svcs = append(svcs, s) - } return svcs, nil } -func (n *storeregistry) getService(s string) (*registry.Service, error) { +// getNode retrieves a node from the store. It returns a service to also keep track of the version. +func (n *storeregistry) getNode(s string) (*registry.Service, error) { recs, err := n.store.Read(s) if err != nil { return nil, err diff --git a/ocis-pkg/natsjsregistry/watcher.go b/ocis-pkg/natsjsregistry/watcher.go index 867d726261c..9376c5ae42f 100644 --- a/ocis-pkg/natsjsregistry/watcher.go +++ b/ocis-pkg/natsjsregistry/watcher.go @@ -3,6 +3,7 @@ package natsjsregistry import ( "encoding/json" "errors" + "strings" natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv" "github.com/nats-io/nats.go" @@ -48,9 +49,21 @@ func (w *Watcher) Next() (*registry.Result, error) { } var svc registry.Service - if err := json.Unmarshal(kve.Value.Data, &svc); err != nil { - _ = w.stop() - return nil, err + if kve.Value.Data == nil { + // fake a service + parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3) + if len(parts) != 3 { + return nil, errors.New("invalid service key") + } + svc.Name = parts[0] + // ocis registers nodes with a - separator + svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}} + svc.Version = parts[2] + } else { + if err := json.Unmarshal(kve.Value.Data, &svc); err != nil { + _ = w.stop() + return nil, err + } } return ®istry.Result{ diff --git a/ocis-pkg/registry/service.go b/ocis-pkg/registry/service.go index 65c62e8d262..aebccc75202 100644 --- a/ocis-pkg/registry/service.go +++ b/ocis-pkg/registry/service.go @@ -65,6 +65,7 @@ func BuildHTTPService(serviceID, address string, version string) *mRegistry.Serv } node := &mRegistry.Node{ + // This id is read by the registry watcher Id: serviceID + "-" + server.DefaultId, Address: net.JoinHostPort(addr, fmt.Sprint(port)), Metadata: make(map[string]string),