Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats registry fixes #9740

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/unreleased/fix-natsjskv-registry-2.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
Internally, it can now distinguish services by version and will aggregate all nodes of the same version into a single service, as expected by the registry cache and watcher.

https://github.com/owncloud/ocis/pull/9734
https://github.com/owncloud/ocis/pull/9726
https://github.com/owncloud/ocis/pull/9656
20 changes: 14 additions & 6 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
return err
}
return n.store.Write(&store.Record{
Key: s.Name + _serviceDelimiter + server.DefaultId,
Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version,
Value: b,
Expiry: options.TTL,
})
Expand All @@ -105,7 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId)
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version)
}

// GetService gets a specific service from the registry
Expand Down Expand Up @@ -138,20 +138,28 @@ func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Serv
return nil, err
}

svcs := make([]*registry.Service, 0, len(keys))
versions := map[string]*registry.Service{}
for _, k := range keys {
s, err := n.getService(k)
s, err := n.getNode(k)
if err != nil {
// TODO: continue ?
return nil, err
}
if versions[s.Version] == nil {
versions[s.Version] = s
} else {
versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...)
}
}
svcs := make([]*registry.Service, 0, len(versions))
for _, s := range versions {
svcs = append(svcs, s)

}
return svcs, nil
}

func (n *storeregistry) getService(s string) (*registry.Service, error) {
// getNode retrieves a node from the store. It returns a service to also keep track of the version.
func (n *storeregistry) getNode(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
Expand Down
19 changes: 16 additions & 3 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package natsjsregistry
import (
"encoding/json"
"errors"
"strings"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -48,9 +49,21 @@ func (w *Watcher) Next() (*registry.Result, error) {
}

var svc registry.Service
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
if kve.Value.Data == nil {
// fake a service
parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3)
if len(parts) != 3 {
return nil, errors.New("invalid service key")
}
svc.Name = parts[0]
// ocis registers nodes with a - separator
svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}}
svc.Version = parts[2]
} else {
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
}
}

return &registry.Result{
Expand Down
1 change: 1 addition & 0 deletions ocis-pkg/registry/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func BuildHTTPService(serviceID, address string, version string) *mRegistry.Serv
}

node := &mRegistry.Node{
// This id is read by the registry watcher
Id: serviceID + "-" + server.DefaultId,
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
Metadata: make(map[string]string),
Expand Down