Skip to content

Commit

Permalink
add synchronization for begin.Unregister()
Browse files Browse the repository at this point in the history
Signed-off-by: Nikita Skrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik committed Oct 16, 2023
1 parent 9c01d74 commit a59de61
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 1 deletion.
37 changes: 36 additions & 1 deletion pkg/registry/common/begin/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package begin

import (
"context"
"sync/atomic"

"github.com/edwarnicke/genericsync"
"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -30,8 +31,14 @@ import (
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type queue struct {
eventCount *atomic.Int64
queueChan chan func()
}

type beginNSEServer struct {
genericsync.Map[string, *eventNSEFactoryServer]
queueMap genericsync.Map[string, *queue]
}

func (b *beginNSEServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
Expand Down Expand Up @@ -92,7 +99,35 @@ func (b *beginNSEServer) Unregister(ctx context.Context, in *registry.NetworkSer
}
eventFactoryServer, ok := b.Load(id)
if !ok {
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
q, loaded := b.queueMap.LoadOrStore(id, &queue{eventCount: &atomic.Int64{}, queueChan: make(chan func())})
if !loaded {
log.FromContext(ctx).Infof("Haven't found a queue. Starting a new one: %v", q)
go func() {
for event := range q.queueChan {
log.FromContext(ctx).Infof("Got a new event")
q.eventCount.Add(-1)
event()

if q.eventCount.Load() == int64(0) {
log.FromContext(ctx).Infof("All events have been processed. Closing event factory...")
b.queueMap.Delete(id)
close(q.queueChan)
return
}
}
}()
}

q.eventCount.Add(1)
waitCtx, cancel := context.WithCancel(ctx)
var err error
q.queueChan <- func() {
_, err = next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
cancel()
}

<-waitCtx.Done()
return &emptypb.Empty{}, err
}
var err error
<-eventFactoryServer.executor.AsyncExec(func() {
Expand Down
86 changes: 86 additions & 0 deletions pkg/registry/common/begin/nse_server_data_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package begin_test

import (
"context"
"sync"
"testing"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"

"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"

"go.uber.org/goleak"
)

const (
eventCount = 10
)

type dataRaceServer struct {
count int
}

func (s *dataRaceServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)

}

func (s *dataRaceServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *dataRaceServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
s.count = s.count + 1
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

func TestServerDataRaceOnUnregister(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

datarace := &dataRaceServer{count: 0}
server := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
datarace,
)
id := "1"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

nse := &registry.NetworkServiceEndpoint{
Name: id,
}

var wg sync.WaitGroup
wg.Add(eventCount)

for i := 0; i < eventCount; i++ {
go func() {
server.Unregister(ctx, nse)
wg.Done()
}()
}

wg.Wait()

require.Equal(t, datarace.count, eventCount)
}

0 comments on commit a59de61

Please sign in to comment.