Skip to content

Commit

Permalink
add "retry" networkservice.Client wrapper for nscs
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Oct 31, 2021
1 parent f20e20a commit c104398
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ issues:
- path: pkg/networkservice/utils/checks/checkerror/server_test.go
linters:
- dupl
- path: pkg/networkservice/common/retry/client.go
linters:
- dupl
text: "lines are duplicate of"
- path: pkg/networkservice/utils/checks/checkerror/client_test.go
linters:
Expand Down
135 changes: 135 additions & 0 deletions pkg/networkservice/common/retry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package retry provides a networkservice.NetworksrviceClient wrapper that allows to retries requests and closes.
package retry

import (
"context"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

// Settings represents retry policy settings.
type Settings struct {
Timeout time.Duration
Interval time.Duration
TryTimeout time.Duration
}

type retryClient struct {
Settings
client networkservice.NetworkServiceClient
}

// Option configuress retry.Client instance.
type Option func(*retryClient)

// WithSettings sets retry policy settings for the retry.Client instance.
func WithSettings(s Settings) Option {
return func(rc *retryClient) {
rc.Settings = s
}
}

// NewClient - returns a connect chain element
func NewClient(client networkservice.NetworkServiceClient, opts ...Option) networkservice.NetworkServiceClient {
var result = &retryClient{
Settings: Settings{
Interval: time.Millisecond * 100,
TryTimeout: time.Second,
Timeout: (time.Second + time.Millisecond*100) * 15,
},
client: client,
}

for _, opt := range opts {
opt(result)
}

return result
}

func (r *retryClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
logger := log.FromContext(ctx).WithField("retryClient", "Request")
c := clock.FromContext(ctx)

if r.Timeout > 0 {
var cancel func()
ctx, cancel = c.WithTimeout(ctx, r.Timeout)
defer cancel()
}

for ctx.Err() == nil {
requestCtx, cancel := c.WithTimeout(ctx, r.TryTimeout)
resp, err := r.client.Request(requestCtx, request, opts...)
cancel()

if err != nil {
logger.Errorf("try attempt has failed: %v", err.Error())

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.After(r.Interval):
continue
}
}

return resp, err
}

return nil, ctx.Err()
}

func (r *retryClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
logger := log.FromContext(ctx).WithField("retryClient", "Close")
c := clock.FromContext(ctx)

if r.Timeout > 0 {
var cancel func()
ctx, cancel = c.WithTimeout(ctx, r.Timeout)
defer cancel()
}

for ctx.Err() == nil {
closeCtx, cancel := c.WithTimeout(ctx, r.TryTimeout)

resp, err := r.client.Close(closeCtx, conn, opts...)
cancel()

if err != nil {
logger.Errorf("try attempt has failed: %v", err.Error())

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.After(r.Interval):
continue
}
}

return resp, err
}

return nil, ctx.Err()
}
161 changes: 161 additions & 0 deletions pkg/networkservice/common/retry/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/clockmock"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/retry"
)

type remoteSideClient struct {
delay time.Duration
failRequestCount int32
failCloseCount int32
}

func (c *remoteSideClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
if atomic.AddInt32(&c.failRequestCount, -1) == -1 {
return next.Client(ctx).Request(ctx, request, opts...)
}

clock.FromContext(ctx).Sleep(c.delay)
return nil, errors.New("cannot connect")
}

func (c *remoteSideClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
if atomic.AddInt32(&c.failCloseCount, -1) == -1 {
return next.Client(ctx).Close(ctx, conn, opts...)
}

clock.FromContext(ctx).Sleep(c.delay)
return nil, errors.New("cannot connect")
}

func Test_RetryClient_Request(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

var counter = new(count.Client)

var client = retry.NewClient(chain.NewNetworkServiceClient(
counter,
&remoteSideClient{
delay: time.Millisecond * 10,
failRequestCount: 5,
},
), retry.WithSettings(retry.Settings{
Interval: time.Millisecond * 10,
Timeout: time.Second / 2,
TryTimeout: time.Second / 30,
}))

var _, err = client.Request(context.Background(), nil)
require.NoError(t, err)
require.Equal(t, 6, counter.Requests())
require.Equal(t, 0, counter.Closes())
}

func Test_RetryClient_Request_ContextHasCorrectDeadline(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clockMock := clockmock.New(ctx)
clockMock.SetSpeed(0)

ctx = clock.WithClock(ctx, clockMock)

expectedDeadline := clockMock.Now().Add(time.Hour)

var client = retry.NewClient(chain.NewNetworkServiceClient(
checkcontext.NewClient(t, func(t *testing.T, c context.Context) {
v, ok := c.Deadline()
require.True(t, ok)
require.Equal(t, expectedDeadline, v)

}),
), retry.WithSettings(retry.Settings{
TryTimeout: time.Hour,
}))

var _, err = client.Request(ctx, nil)
require.NoError(t, err)
}

func Test_RetryClient_Close_ContextHasCorrectDeadline(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

var client = retry.NewClient(chain.NewNetworkServiceClient(
checkcontext.NewClient(t, func(t *testing.T, c context.Context) {
time.Now().Hour()
v, ok := c.Deadline()
require.True(t, ok)
require.Equal(t, time.Now().Hour()+1, v.Hour())
}),
), retry.WithSettings(retry.Settings{
TryTimeout: time.Hour,
}))

var _, err = client.Close(context.Background(), nil)
require.NoError(t, err)
}

func Test_RetryClient_Close(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clockMock := clockmock.New(ctx)
clockMock.SetSpeed(0)

ctx = clock.WithClock(ctx, clockMock)

expectedDeadline := clockMock.Now().Add(time.Hour)

var client = retry.NewClient(chain.NewNetworkServiceClient(
checkcontext.NewClient(t, func(t *testing.T, c context.Context) {
v, ok := c.Deadline()
require.True(t, ok)
require.Equal(t, expectedDeadline, v)

}),
), retry.WithSettings(retry.Settings{
TryTimeout: time.Hour,
}))

var _, err = client.Close(ctx, nil)
require.NoError(t, err)
}

0 comments on commit c104398

Please sign in to comment.