Skip to content

Commit

Permalink
add synchronization of Register and Unregister Methods + add tests wi…
Browse files Browse the repository at this point in the history
…th the same ID

Signed-off-by: Nikita Skrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik committed Oct 23, 2023
1 parent 2e9244e commit bd1170d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 27 deletions.
42 changes: 18 additions & 24 deletions pkg/registry/common/begin/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,42 +52,36 @@ func (b *beginNSEServer) Register(ctx context.Context, in *registry.NetworkServi
if fromContext(ctx) != nil {
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}
eventFactoryServer, _ := b.LoadOrStore(id,
newNSEEventFactoryServer(
ctx,
1,
func() {
b.Delete(id)
},
),
)

eventFactoryServer, loaded := b.LoadOrStore(id, newNSEEventFactoryServer(ctx, 1, func() {}))
var resp *registry.NetworkServiceEndpoint
var err error

<-eventFactoryServer.executor.AsyncExec(func() {
currentEventFactoryServer, _ := b.Load(id)
if currentEventFactoryServer != eventFactoryServer {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer")
resp, err = b.Register(ctx, in)
return
if loaded {
done := false
<-eventFactoryServer.executor.AsyncExec(func() {
currentEventFactoryServer, _ := b.Load(id)
if currentEventFactoryServer != eventFactoryServer {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer")
resp, err = b.Register(ctx, in)
done = true
return
}
currentEventFactoryServer.eventCount++
})

if done {
return resp, err
}
})
}

<-eventFactoryServer.executor.AsyncExec(func() {
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
resp, err = next.NetworkServiceEndpointRegistryServer(withEventFactoryCtx).Register(withEventFactoryCtx, in)
if err != nil {
if eventFactoryServer.state != established {
eventFactoryServer.state = closed
b.Delete(id)
}
return
}
eventFactoryServer.registration = mergeNSE(in, resp)
eventFactoryServer.state = established
eventFactoryServer.response = resp
eventFactoryServer.updateContext(grpcmetadata.PathWithContext(ctx, grpcmetadata.PathFromContext(ctx).Clone()))
eventFactoryServer.eventCount--
})
return resp, err
}
Expand Down
74 changes: 71 additions & 3 deletions pkg/registry/common/begin/nse_server_data_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type dataRaceServer struct {
}

func (s *dataRaceServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
s.count++
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

Expand All @@ -50,11 +51,11 @@ func (s *dataRaceServer) Find(query *registry.NetworkServiceEndpointQuery, serve
}

func (s *dataRaceServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
s.count++
s.count--
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

func TestServerDataRaceOnUnregister(t *testing.T) {
func TestServer_ConcurrentUnregister(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

datarace := &dataRaceServer{count: 0}
Expand All @@ -77,5 +78,72 @@ func TestServerDataRaceOnUnregister(t *testing.T) {
}

wg.Wait()
require.Equal(t, datarace.count, eventCount)
require.Equal(t, -eventCount, datarace.count)
}

func TestServer_ConcurrentRegister(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

datarace := &dataRaceServer{count: 0}
server := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
datarace,
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var wg sync.WaitGroup
wg.Add(eventCount)

for i := 0; i < eventCount; i++ {
local := i
go func() {
_, err := server.Register(begin.WithID(ctx, local), &registry.NetworkServiceEndpoint{Name: "1"})
require.NoError(t, err)
wg.Done()
}()
}

wg.Wait()
require.Equal(t, eventCount, datarace.count)
}

func TestServer_ConcurrentRegisterUnregister(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

datarace := &dataRaceServer{count: 0}
server := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
datarace,
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var wg sync.WaitGroup
wg.Add(2 * eventCount)

go func() {
for i := 0; i < eventCount; i++ {
local := i
go func() {
_, err := server.Register(begin.WithID(ctx, local), &registry.NetworkServiceEndpoint{Name: "1"})
require.NoError(t, err)
wg.Done()
}()
}
}()

go func() {
for i := 0; i < eventCount; i++ {
local := i
go func() {
_, err := server.Unregister(begin.WithID(ctx, local), &registry.NetworkServiceEndpoint{Name: "1"})
require.NoError(t, err)
wg.Done()
}()
}
}()

wg.Wait()
require.Equal(t, 0, datarace.count)
}

0 comments on commit bd1170d

Please sign in to comment.