Skip to content

Commit

Permalink
rework expire nse server chain element (#630)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingajkin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin authored Dec 28, 2020
1 parent 89944c4 commit 4e30068
Show file tree
Hide file tree
Showing 28 changed files with 62 additions and 66 deletions.
9 changes: 6 additions & 3 deletions pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package nsmgr

import (
"context"
"time"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/querycache"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"

Expand All @@ -38,11 +40,11 @@ import (

"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/setid"
"github.com/networkservicemesh/sdk/pkg/registry/common/seturl"
chain_registry "github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/registry/core/nextwrap"
"github.com/networkservicemesh/sdk/pkg/registry/memory"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/connect"
Expand Down Expand Up @@ -128,10 +130,11 @@ func NewServer(ctx context.Context, nsmRegistration *registryapi.NetworkServiceE

nseChain := chain_registry.NewNamedNetworkServiceEndpointRegistryServer(
nsmRegistration.Name+".NetworkServiceEndpointRegistry",
expire.NewNetworkServiceEndpointRegistryServer(time.Minute),
newRecvFDEndpointRegistry(), // Allow to receive a passed files
urlsRegistryServer,
interposeRegistry, // Store cross connect NSEs
localbypassRegistryServer, // Store endpoint Id to EndpointURL for local access.
interposeRegistry, // Store cross connect NSEs
localbypassRegistryServer, // Store endpoint Id to EndpointURL for local access.
seturl.NewNetworkServiceEndpointRegistryServer(nsmRegistration.Url), // Remember endpoint URL
nseRegistry, // Register NSE inside Remote registry with ID assigned
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/discover/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (

"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/setid"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/memory"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"
Expand Down
5 changes: 3 additions & 2 deletions pkg/registry/chains/memory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,26 @@ package memory
import (
"context"
"net/url"
"time"

"github.com/networkservicemesh/api/pkg/api/registry"
"google.golang.org/grpc"

registryserver "github.com/networkservicemesh/sdk/pkg/registry"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/proxy"
"github.com/networkservicemesh/sdk/pkg/registry/common/setid"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
)

// NewServer creates new registry server based on memory storage
func NewServer(ctx context.Context, proxyRegistryURL *url.URL, options ...grpc.DialOption) registryserver.Registry {
nseChain := chain.NewNetworkServiceEndpointRegistryServer(
setid.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(time.Minute),
memory.NewNetworkServiceEndpointRegistryServer(),
proxy.NewNetworkServiceEndpointRegistryServer(proxyRegistryURL),
connect.NewNetworkServiceEndpointRegistryServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) registry.NetworkServiceEndpointRegistryClient {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/clienturl/ns_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"

"github.com/networkservicemesh/sdk/pkg/registry/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/clienturl/nse_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/connect/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/connect/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/expire/ns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (n *nsServer) checkUpdates() {
} else {
atomic.AddInt32(stored, 1)
}
duration := time.Until(time.Unix(nse.ExpirationTime.Seconds, int64(nse.ExpirationTime.Nanos)))
duration := time.Until(nse.ExpirationTime.AsTime())
timer := time.AfterFunc(duration, func() {
if atomic.AddInt32(stored, -1) <= 0 {
if ctx, ok := n.contexts.Load(ns); ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/expire/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/stretchr/testify/require"

"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/setid"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
)

const testPeriod = time.Millisecond * 50
Expand Down
29 changes: 14 additions & 15 deletions pkg/registry/common/expire/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,23 @@ import (
)

type nseServer struct {
timers timerMap
timers timerMap
nseExpiration time.Duration
}

func (n *nseServer) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
r, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
if err != nil {
return nil, err
}
if nse.ExpirationTime == nil {
nse.ExpirationTime = &timestamp.Timestamp{}
}
if nse.ExpirationTime.Seconds != 0 || nse.ExpirationTime.Nanos != 0 {
duration := time.Until(time.Unix(nse.ExpirationTime.Seconds, int64(nse.ExpirationTime.Nanos)))
timer := time.AfterFunc(duration, func() {
_, _ = next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse)
})
if t, load := n.timers.LoadOrStore(nse.Name, timer); load {
timer.Stop()
t.Reset(duration)
}
expirationTime := time.Now().Add(n.nseExpiration)
nse.ExpirationTime = &timestamp.Timestamp{Seconds: expirationTime.Unix(), Nanos: int32(expirationTime.Nanosecond())}
timer := time.AfterFunc(n.nseExpiration, func() {
_, _ = next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse)
})
if t, load := n.timers.LoadOrStore(nse.Name, timer); load {
timer.Stop()
t.Reset(n.nseExpiration)
}
return r, nil
}
Expand All @@ -69,6 +66,8 @@ func (n *nseServer) Unregister(ctx context.Context, nse *registry.NetworkService
}

// NewNetworkServiceEndpointRegistryServer wraps passed NetworkServiceEndpointRegistryServer and monitor Network service endpoints
func NewNetworkServiceEndpointRegistryServer() registry.NetworkServiceEndpointRegistryServer {
return &nseServer{}
func NewNetworkServiceEndpointRegistryServer(nseExpiration time.Duration) registry.NetworkServiceEndpointRegistryServer {
return &nseServer{
nseExpiration: nseExpiration,
}
}
13 changes: 3 additions & 10 deletions pkg/registry/common/expire/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (

"go.uber.org/goleak"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/memory"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"

Expand All @@ -36,14 +35,8 @@ import (

func TestNewNetworkServiceEndpointRegistryServer(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
s := next.NewNetworkServiceEndpointRegistryServer(expire.NewNetworkServiceEndpointRegistryServer(), memory.NewNetworkServiceEndpointRegistryServer())
expiration := time.Now().Add(testPeriod * 2)
_, err := s.Register(context.Background(), &registry.NetworkServiceEndpoint{
ExpirationTime: &timestamp.Timestamp{
Seconds: expiration.Unix(),
Nanos: int32(expiration.Nanosecond()),
},
})
s := next.NewNetworkServiceEndpointRegistryServer(expire.NewNetworkServiceEndpointRegistryServer(testPeriod*2), memory.NewNetworkServiceEndpointRegistryServer())
_, err := s.Register(context.Background(), &registry.NetworkServiceEndpoint{})
require.Nil(t, err)
c := adapters.NetworkServiceEndpointServerToClient(s)
stream, err := c.Find(context.Background(), &registry.NetworkServiceEndpointQuery{
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (n *networkServiceRegistryServer) Register(ctx context.Context, ns *registr
n.networkServices.Store(r.Name, r.Clone())
n.executor.AsyncExec(func() {
for _, ch := range n.eventChannels {
ch <- r
ch <- r.Clone()
}
})
return r, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/protobuf/proto"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
)

func TestNetworkServiceRegistryServer_RegisterAndFind(t *testing.T) {
Expand All @@ -50,16 +51,17 @@ func TestNetworkServiceRegistryServer_RegisterAndFind(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan *registry.NetworkService, 1)
defer close(ch)
_ = s.Find(&registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{
Name: "a",
},
}, streamchannel.NewNetworkServiceFindServer(ctx, ch))

require.Equal(t, &registry.NetworkService{
expected := &registry.NetworkService{
Name: "a",
}, <-ch)
close(ch)
}
require.True(t, proto.Equal(expected, <-ch))
}

func TestNetworkServiceRegistryServer_RegisterAndFindWatch(t *testing.T) {
Expand All @@ -82,7 +84,9 @@ func TestNetworkServiceRegistryServer_RegisterAndFindWatch(t *testing.T) {
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan *registry.NetworkService, 1)
defer close(ch)
go func() {
_ = s.Find(&registry.NetworkServiceQuery{
Watch: true,
Expand All @@ -100,8 +104,5 @@ func TestNetworkServiceRegistryServer_RegisterAndFindWatch(t *testing.T) {
Name: "a",
})
require.NoError(t, err)
require.Equal(t, expected, <-ch)

cancel()
close(ch)
require.True(t, proto.Equal(expected, <-ch))
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (n *networkServiceEndpointRegistryServer) Register(ctx context.Context, nse
return nil, err
}
n.networkServiceEndpoints.Store(r.Name, r.Clone())
n.sendEvent(r)
n.sendEvent(r.Clone())
return r, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/protobuf/proto"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
)

func TestNetworkServiceEndpointRegistryServer_RegisterAndFind(t *testing.T) {
Expand All @@ -48,7 +49,9 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFind(t *testing.T) {
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan *registry.NetworkServiceEndpoint, 1)
defer close(ch)
_ = s.Find(&registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "a",
Expand All @@ -58,8 +61,6 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFind(t *testing.T) {
require.Equal(t, &registry.NetworkServiceEndpoint{
Name: "a",
}, <-ch)
cancel()
close(ch)
}

func TestNetworkServiceEndpointRegistryServer_RegisterAndFindWatch(t *testing.T) {
Expand All @@ -84,6 +85,7 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFindWatch(t *testing.T)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan *registry.NetworkServiceEndpoint, 1)
defer close(ch)
go func() {
_ = s.Find(&registry.NetworkServiceEndpointQuery{
Watch: true,
Expand All @@ -101,9 +103,7 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFindWatch(t *testing.T)
Name: "a",
})
require.NoError(t, err)
require.Equal(t, expected, <-ch)

close(ch)
require.True(t, proto.Equal(expected, <-ch))
}

func TestNetworkServiceEndpointRegistryServer_RegisterAndFindByLabel(t *testing.T) {
Expand All @@ -120,7 +120,9 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFindByLabel(t *testing.
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan *registry.NetworkServiceEndpoint, 1)
defer close(ch)
_ = s.Find(&registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
NetworkServiceLabels: map[string]*registry.NetworkServiceLabels{
Expand All @@ -133,9 +135,7 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFindByLabel(t *testing.
},
}, streamchannel.NewNetworkServiceEndpointFindServer(ctx, ch))

require.Equal(t, createLabeledNSE2(), <-ch)
cancel()
close(ch)
require.True(t, proto.Equal(createLabeledNSE2(), <-ch))
}

func TestNetworkServiceEndpointRegistryServer_RegisterAndFindByLabelWatch(t *testing.T) {
Expand All @@ -154,6 +154,7 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFindByLabelWatch(t *tes
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan *registry.NetworkServiceEndpoint, 1)
defer close(ch)
go func() {
_ = s.Find(&registry.NetworkServiceEndpointQuery{
Watch: true,
Expand All @@ -173,9 +174,7 @@ func TestNetworkServiceEndpointRegistryServer_RegisterAndFindByLabelWatch(t *tes

expected, err := s.Register(context.Background(), createLabeledNSE2())
require.NoError(t, err)
require.Equal(t, expected, <-ch)

close(ch)
require.True(t, proto.Equal(expected, <-ch))
}

func createLabeledNSE1() *registry.NetworkServiceEndpoint {
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/registry/common/proxy/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
)

func TestNewProxyNetworkServiceRegistryServer_Register(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/common/proxy/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/memory"
)

func TestNewProxyNetworkServiceEndpointRegistryServer_Register(t *testing.T) {
Expand Down
Loading

0 comments on commit 4e30068

Please sign in to comment.