Skip to content

Commit

Permalink
Merge pull request #9740 from owncloud/nats-registry-fixes
Browse files Browse the repository at this point in the history
Nats registry fixes
  • Loading branch information
butonic authored Aug 7, 2024
2 parents 04c2b4a + c3f170b commit 339ed19
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
2 changes: 2 additions & 0 deletions changelog/unreleased/fix-natsjskv-registry-2.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
Internally, it can now distinguish services by version and will aggregate all nodes of the same version into a single service, as expected by the registry cache and watcher.

https://github.com/owncloud/ocis/pull/9734
https://github.com/owncloud/ocis/pull/9726
https://github.com/owncloud/ocis/pull/9656
20 changes: 14 additions & 6 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
return err
}
return n.store.Write(&store.Record{
Key: s.Name + _serviceDelimiter + server.DefaultId,
Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version,
Value: b,
Expiry: options.TTL,
})
Expand All @@ -105,7 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId)
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version)
}

// GetService gets a specific service from the registry
Expand Down Expand Up @@ -138,20 +138,28 @@ func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Serv
return nil, err
}

svcs := make([]*registry.Service, 0, len(keys))
versions := map[string]*registry.Service{}
for _, k := range keys {
s, err := n.getService(k)
s, err := n.getNode(k)
if err != nil {
// TODO: continue ?
return nil, err
}
if versions[s.Version] == nil {
versions[s.Version] = s
} else {
versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...)
}
}
svcs := make([]*registry.Service, 0, len(versions))
for _, s := range versions {
svcs = append(svcs, s)

}
return svcs, nil
}

func (n *storeregistry) getService(s string) (*registry.Service, error) {
// getNode retrieves a node from the store. It returns a service to also keep track of the version.
func (n *storeregistry) getNode(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
Expand Down
19 changes: 16 additions & 3 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package natsjsregistry
import (
"encoding/json"
"errors"
"strings"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -48,9 +49,21 @@ func (w *Watcher) Next() (*registry.Result, error) {
}

var svc registry.Service
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
if kve.Value.Data == nil {
// fake a service
parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3)
if len(parts) != 3 {
return nil, errors.New("invalid service key")
}
svc.Name = parts[0]
// ocis registers nodes with a - separator
svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}}
svc.Version = parts[2]
} else {
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
}
}

return &registry.Result{
Expand Down
1 change: 1 addition & 0 deletions ocis-pkg/registry/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func BuildHTTPService(serviceID, address string, version string) *mRegistry.Serv
}

node := &mRegistry.Node{
// This id is read by the registry watcher
Id: serviceID + "-" + server.DefaultId,
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
Metadata: make(map[string]string),
Expand Down

0 comments on commit 339ed19

Please sign in to comment.