Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse node id when registering services #9656

Merged
merged 9 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/unreleased/fix-natsjskv-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.

https://github.com/owncloud/ocis/pull/9656
https://github.com/owncloud/ocis/pull/9662
https://github.com/owncloud/ocis/pull/9654
https://github.com/owncloud/ocis/pull/9620
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ replace github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-2

replace github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c

replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240723073728-b36ea3314b73
replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240724102745-4bc93ffd7ab6

// exclude the v2 line of go-sqlite3 which was released accidentally and prevents pulling in newer versions of go-sqlite3
// see https://github.com/mattn/go-sqlite3/issues/965 for more details
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1609,8 +1609,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240723073728-b36ea3314b73 h1:Cgg5BVWG99INUMX43nD5jhZgNzQJyFA0MvZkctNn0Lw=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240723073728-b36ea3314b73/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240724102745-4bc93ffd7ab6 h1:NNXx1/XWR6Ryud6qNanwrl/JuRx2KdCW1jS2/Cf/TO8=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240724102745-4bc93ffd7ab6/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
20 changes: 4 additions & 16 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"time"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/server"
"go-micro.dev/v4/store"
"go-micro.dev/v4/util/cmd"
)
Expand All @@ -25,7 +25,7 @@ var (
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"

_serviceDelimiter = "/"
_serviceDelimiter = "@"
)

func init() {
Expand Down Expand Up @@ -90,18 +90,12 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
o(&options)
}

unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Metadata["uuid"] = unique

b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name + _serviceDelimiter + unique,
Key: s.Name + _serviceDelimiter + server.DefaultId,
Value: b,
Expiry: options.TTL,
})
Expand All @@ -111,13 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

var unique string
if s.Metadata != nil {
unique = s.Metadata["uuid"]
}

return n.store.Delete(s.Name + _serviceDelimiter + unique)
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId)
}

// GetService gets a specific service from the registry
Expand Down
36 changes: 13 additions & 23 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package natsjsregistry

import (
"encoding/json"
"errors"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)

// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error)
WatchAll(bucket string, opts ...nats.WatchOpt) (<-chan *natsjskv.StoreUpdate, func() error, error)
}

// Watcher is used to keep track of changes in the registry
type Watcher struct {
watch nats.KeyWatcher
updates <-chan nats.KeyValueEntry
updates <-chan *natsjskv.StoreUpdate
stop func() error
reg *storeregistry
}

Expand All @@ -26,14 +28,14 @@ func NewWatcher(s *storeregistry) (*Watcher, error) {
return nil, errors.New("store does not implement watcher interface")
}

watcher, err := w.WatchAll("service-registry")
watcher, stop, err := w.WatchAll("service-registry")
if err != nil {
return nil, err
}

return &Watcher{
watch: watcher,
updates: watcher.Updates(),
updates: watcher,
stop: stop,
reg: s,
}, nil
}
Expand All @@ -45,30 +47,18 @@ func (w *Watcher) Next() (*registry.Result, error) {
return nil, errors.New("watcher stopped")
}

service, err := w.reg.getService(kve.Key())
if err != nil {
var svc *registry.Service
if err := json.Unmarshal(kve.Value.Data, svc); err != nil {
return nil, err
}

var action string
switch kve.Operation() {
default:
action = "create"
case nats.KeyValuePut:
action = "create"
case nats.KeyValueDelete:
action = "delete"
case nats.KeyValuePurge:
action = "delete"
}

return &registry.Result{
Service: service,
Action: action,
Service: svc,
Action: kve.Action,
}, nil
}

// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.watch.Stop()
_ = w.stop()
}
9 changes: 5 additions & 4 deletions ocis-pkg/registry/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"strings"

mRegistry "go-micro.dev/v4/registry"
"go-micro.dev/v4/server"
"go-micro.dev/v4/util/addr"
)

func BuildGRPCService(serviceID, uuid, address string, version string) *mRegistry.Service {
func BuildGRPCService(serviceID, address string, version string) *mRegistry.Service {
var host string
var port int

Expand All @@ -28,7 +29,7 @@ func BuildGRPCService(serviceID, uuid, address string, version string) *mRegistr
}

node := &mRegistry.Node{
Id: serviceID + "-" + uuid,
Id: serviceID + "-" + server.DefaultId,
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
Metadata: make(map[string]string),
}
Expand All @@ -46,7 +47,7 @@ func BuildGRPCService(serviceID, uuid, address string, version string) *mRegistr
}
}

func BuildHTTPService(serviceID, uuid, address string, version string) *mRegistry.Service {
func BuildHTTPService(serviceID, address string, version string) *mRegistry.Service {
var host string
var port int

Expand All @@ -64,7 +65,7 @@ func BuildHTTPService(serviceID, uuid, address string, version string) *mRegistr
}

node := &mRegistry.Node{
Id: serviceID + "-" + uuid,
Id: serviceID + "-" + server.DefaultId,
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
Metadata: make(map[string]string),
}
Expand Down
1 change: 1 addition & 0 deletions ocis-pkg/service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewServiceWithClient(client client.Client, opts ...Option) (Service, error)
var mServer server.Server
sopts := newOptions(opts...)
tlsConfig := &tls.Config{}

if sopts.TLSEnabled {
var cert tls.Certificate
var err error
Expand Down
6 changes: 5 additions & 1 deletion ocis/pkg/command/root.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package command

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/owncloud/ocis/v2/ocis-pkg/clihelper"
"github.com/owncloud/ocis/v2/ocis-pkg/config"
Expand All @@ -25,5 +28,6 @@ func Execute() error {
)
}

return app.Run(os.Args)
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
return app.RunContext(ctx, os.Args)
}
2 changes: 1 addition & 1 deletion ocis/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Server(cfg *config.Config) *cli.Command {
Action: func(c *cli.Context) error {
// Prefer the in-memory registry as the default when running in single-binary mode
r := runtime.New(cfg)
return r.Start()
return r.Start(c.Context)
},
}
}
Expand Down
6 changes: 4 additions & 2 deletions ocis/pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package runtime

import (
"context"

"github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/ocis/pkg/runtime/service"
)
Expand All @@ -18,6 +20,6 @@ func New(cfg *config.Config) Runtime {
}

// Start rpc runtime
func (r *Runtime) Start() error {
return service.Start(service.WithConfig(r.c))
func (r *Runtime) Start(ctx context.Context) error {
return service.Start(ctx, service.WithConfig(r.c))
}
26 changes: 11 additions & 15 deletions ocis/pkg/runtime/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (
"net/http"
"net/rpc"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"

authapp "github.com/owncloud/ocis/v2/services/auth-app/pkg/command"
Expand Down Expand Up @@ -96,7 +94,7 @@ type Service struct {
// calls are done explicitly to loadFromEnv().
// Since this is the public constructor, options need to be added, at the moment only logging options
// are supported in order to match the running OwnCloud services structured log.
func NewService(options ...Option) (*Service, error) {
func NewService(ctx context.Context, options ...Option) (*Service, error) {
opts := NewOptions()

for _, f := range options {
Expand All @@ -109,7 +107,7 @@ func NewService(options ...Option) (*Service, error) {
log.Level(opts.Config.Log.Level),
)

globalCtx, cancelGlobal := context.WithCancel(context.Background())
globalCtx, cancelGlobal := context.WithCancel(ctx)

s := &Service{
Services: make([]serviceFuncMap, len(_waitFuncs)),
Expand Down Expand Up @@ -352,19 +350,18 @@ func NewService(options ...Option) (*Service, error) {

// Start a rpc service. By default, the package scope Start will run all default services to provide with a working
// oCIS instance.
func Start(o ...Option) error {
func Start(ctx context.Context, o ...Option) error {
// Start the runtime. Most likely this was called ONLY by the `ocis server` subcommand, but since we cannot protect
// from the caller, the previous statement holds truth.

// prepare a new rpc Service struct.
s, err := NewService(o...)
s, err := NewService(ctx, o...)
if err != nil {
return err
}

// halt listens for interrupt signals and blocks.
halt := make(chan os.Signal, 1)
signal.Notify(halt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
// get a cancel function to stop the service
ctx, cancel := context.WithCancel(ctx)

// tolerance controls backoff cycles from the supervisor.
tolerance := 5
Expand All @@ -376,7 +373,7 @@ func Start(o ...Option) error {
if e.Type() == suture.EventTypeBackoff {
totalBackoff++
if totalBackoff == tolerance {
halt <- os.Interrupt
cancel()
}
}
s.Log.Info().Str("event", e.String()).Msg(fmt.Sprintf("supervisor: %v", e.Map()["supervisor_name"]))
Expand Down Expand Up @@ -424,8 +421,8 @@ func Start(o ...Option) error {
// https://pkg.go.dev/github.com/thejerf/suture/v4@v4.0.0#Supervisor
go s.Supervisor.ServeBackground(s.context)

// trap will block on halt channel for interruptions.
go trap(s, halt)
// trap will block on context done channel for interruptions.
go trap(s, ctx)

for i, service := range s.Services {
scheduleServiceTokens(s, service)
Expand Down Expand Up @@ -508,9 +505,8 @@ func (s *Service) List(_ struct{}, reply *string) error {

// trap blocks on halt channel. When the runtime is interrupted it
// signals the controller to stop any supervised process.
func trap(s *Service, halt chan os.Signal) {
<-halt
s.cancel()
func trap(s *Service, ctx context.Context) {
<-ctx.Done()
for sName := range s.serviceToken {
for i := range s.serviceToken[sName] {
if err := s.Supervisor.Remove(s.serviceToken[sName][i]); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion services/activitylog/cmd/activitylog/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package main

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/owncloud/ocis/v2/services/activitylog/pkg/command"
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config/defaults"
)

func main() {
if err := command.Execute(defaults.DefaultConfig()); err != nil {
cfg := defaults.DefaultConfig()
cfg.Context, _ = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
if err := command.Execute(cfg); err != nil {
os.Exit(1)
}
}
2 changes: 1 addition & 1 deletion services/activitylog/pkg/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ func Execute(cfg *config.Config) error {
Commands: GetCommands(cfg),
})

return app.Run(os.Args)
return app.RunContext(cfg.Context, os.Args)
}
Loading