Skip to content

Commit

Permalink
test StressTest for begin server
Browse files Browse the repository at this point in the history
Signed-off-by: Nikita Skrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik committed Oct 30, 2023
1 parent 28095fb commit df9144c
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 27 deletions.
3 changes: 0 additions & 3 deletions pkg/registry/common/begin/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func BenchmarkBegin_RegisterUnregisterSameIDs(b *testing.B) {
func BenchmarkBegin_RegisterDifferentIDs(b *testing.B) {
server := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
&dataRaceServer{count: 0},
)

var wg sync.WaitGroup
Expand All @@ -145,7 +144,6 @@ func BenchmarkBegin_RegisterDifferentIDs(b *testing.B) {
func BenchmarkBegin_UnregisterDifferentIDs(b *testing.B) {
server := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
&dataRaceServer{count: 0},
)

var wg sync.WaitGroup
Expand All @@ -165,7 +163,6 @@ func BenchmarkBegin_UnregisterDifferentIDs(b *testing.B) {
func BenchmarkBegin_RegisterUnregisterDifferentIDs(b *testing.B) {
server := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
&dataRaceServer{count: 0},
)

var wg sync.WaitGroup
Expand Down
21 changes: 9 additions & 12 deletions pkg/registry/common/begin/nse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/registry/common/grpcmetadata"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
Expand Down Expand Up @@ -91,30 +92,26 @@ func (b *beginNSEClient) Unregister(ctx context.Context, in *registry.NetworkSer
if fromContext(ctx) != nil {
return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...)
}
eventFactoryClient, ok := b.Load(id)
if !ok {
return new(empty.Empty), nil
}
var emp *empty.Empty
eventFactoryClient, _ := b.LoadOrStore(id, newEventNSEFactoryClient(ctx, func() { b.Delete(id) }))
var err error
<-eventFactoryClient.executor.AsyncExec(func() {
// If the connection is not established, don't do anything
if eventFactoryClient.state != established || eventFactoryClient.client == nil || eventFactoryClient.registration == nil {
return
}

// If this isn't the connection we started with, do nothing
currentEventFactoryClient, _ := b.Load(id)
if currentEventFactoryClient != eventFactoryClient {
_, err = b.Unregister(ctx, in, opts...)
return
}
registration := in
if eventFactoryClient.registration != nil {
registration = eventFactoryClient.registration
}
// Always close with the last valid Connection we got
withEventFactoryCtx := withEventFactory(ctx, eventFactoryClient)
emp, err = next.NetworkServiceEndpointRegistryClient(withEventFactoryCtx).Unregister(withEventFactoryCtx, eventFactoryClient.registration, opts...)
_, err = next.NetworkServiceEndpointRegistryClient(withEventFactoryCtx).Unregister(withEventFactoryCtx, registration, opts...)
// afterCloseFunc() is used to cleanup things like the entry in the Map for EventFactories
eventFactoryClient.afterCloseFunc()
})
return emp, err
return &emptypb.Empty{}, err
}

// NewNetworkServiceEndpointRegistryClient - returns a new null client that does nothing but call next.NetworkServiceEndpointRegistryClient(ctx).
Expand Down
7 changes: 5 additions & 2 deletions pkg/registry/common/begin/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ func (b *beginNSEServer) Unregister(ctx context.Context, in *registry.NetworkSer
}
eventFactoryServer, _ := b.LoadOrStore(id, newNSEEventFactoryServer(ctx, func() {
b.Delete(id)

}))

var err error
<-eventFactoryServer.executor.AsyncExec(func() {
currentServerClient, _ := b.Load(id)
if currentServerClient != eventFactoryServer {
currentEventFactoryServer, _ := b.Load(id)
if currentEventFactoryServer != eventFactoryServer {
_, err = b.Unregister(ctx, in)
return
}

registration := in
if eventFactoryServer.registration != nil {
registration = eventFactoryServer.registration
Expand Down
5 changes: 0 additions & 5 deletions pkg/registry/common/begin/serialize_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func TestSerializeClient_StressTest(t *testing.T) {
type parallelClient struct {
t *testing.T
states sync.Map
mu sync.Mutex
}

func newParallelClient(t *testing.T) *parallelClient {
Expand All @@ -79,12 +78,10 @@ func (s *parallelClient) Register(ctx context.Context, in *registry.NetworkServi
raw, _ := s.states.LoadOrStore(in.GetName(), new(int32))
statePtr := raw.(*int32)

s.mu.Lock()
state := atomic.LoadInt32(statePtr)
if !atomic.CompareAndSwapInt32(statePtr, state, state+1) {
assert.Failf(s.t, "", "state has been changed for connection %s expected %d actual %d", in.GetName(), state, atomic.LoadInt32(statePtr))
}
s.mu.Unlock()

return next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, in, opts...)
}
Expand All @@ -97,12 +94,10 @@ func (s *parallelClient) Unregister(ctx context.Context, in *registry.NetworkSer
raw, _ := s.states.LoadOrStore(in.GetName(), new(int32))
statePtr := raw.(*int32)

s.mu.Lock()
state := atomic.LoadInt32(statePtr)
if !atomic.CompareAndSwapInt32(statePtr, state, state+1) {
assert.Failf(s.t, "", "state has been changed for connection %s expected %d actual %d", in.GetName(), state, atomic.LoadInt32(statePtr))
}
s.mu.Unlock()

return next.NetworkServiceEndpointRegistryClient(ctx).Unregister(ctx, in, opts...)
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/registry/common/begin/serialize_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,15 @@ func newParallelServer(t *testing.T) *parallelServer {
type parallelServer struct {
t *testing.T
states sync.Map
mu sync.Mutex
}

func (s *parallelServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
raw, _ := s.states.LoadOrStore(in.GetName(), new(int32))
statePtr := raw.(*int32)

s.mu.Lock()
state := atomic.LoadInt32(statePtr)
assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "[Register] state has been changed for connection %s expected %d actual %d", in.GetName(), state, atomic.LoadInt32(statePtr))
s.mu.Unlock()

return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

Expand All @@ -97,9 +95,8 @@ func (s *parallelServer) Unregister(ctx context.Context, in *registry.NetworkSer
raw, _ := s.states.LoadOrStore(in.GetName(), new(int32))
statePtr := raw.(*int32)

s.mu.Lock()
state := atomic.LoadInt32(statePtr)
assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "[Unregister] state has been changed for connection %s expected %d actual %d", in.GetName(), state, atomic.LoadInt32(statePtr))
s.mu.Unlock()

return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

0 comments on commit df9144c

Please sign in to comment.