Skip to content

Commit

Permalink
Adjustments to README and consistency of callback options. (#555)
Browse files Browse the repository at this point in the history
Signed-off-by: bwplotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Apr 4, 2023
1 parent 0e1142d commit 5ca0c41
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 116 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ This list covers known interceptors that users use for their Go microservices (b
All paths should work with `go get <path>`.

#### Auth
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth`](interceptors/auth) - a customizable (via `AuthFunc`) piece of auth middleware.
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth`](interceptors/auth) - a customizable via `AuthFunc` piece of auth middleware.
* (external) [`google.golang.org/grpc/authz`](https://github.com/grpc/grpc-go/blob/master/authz/grpc_authz_server_interceptors.go) - more complex, customizable via auth polices (RBAC like), piece of auth middleware.

#### Observability
* Metrics with [`github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus`](providers/prometheus) - Prometheus client-side and server-side monitoring middleware. Supports exemplars. Moved from deprecated now [`go-grpc-prometheus`](https://github.com/grpc-ecosystem/go-grpc-prometheus). It's a separate module, so core module has limited number of dependencies.
Expand All @@ -54,8 +55,9 @@ All paths should work with `go get <path>`.
* (external) [`github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing`](https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware@v1.4.0/tracing/opentracing) - deprecated [OpenTracing](http://opentracing.io/) client-side and server-side interceptors if you still need it!

#### Client
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry`](interceptors/retry) - a generic gRPC response code retry mechanism, client-side middleware
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/timeout`](interceptors/timeout) - a generic gRPC request timeout, client-side middleware
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry`](interceptors/retry) - a generic gRPC response code retry mechanism, client-side middleware.
* NOTE: grpc-go has native retries too with advanced policies (https://github.com/grpc/grpc-go/blob/v1.54.0/examples/features/retry/client/main.go)
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/timeout`](interceptors/timeout) - a generic gRPC request timeout, client-side middleware.

#### Server
* [`github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator`](interceptors/validator) - codegen inbound message validation from `.proto` options.
Expand Down
2 changes: 2 additions & 0 deletions interceptors/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ServiceAuthFuncOverride interface {
}

// UnaryServerInterceptor returns a new unary server interceptors that performs per-request auth.
// NOTE(bwplotka): For more complex auth interceptor see https://github.com/grpc/grpc-go/blob/master/authz/grpc_authz_server_interceptors.go.
func UnaryServerInterceptor(authFunc AuthFunc) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
var newCtx context.Context
Expand All @@ -49,6 +50,7 @@ func UnaryServerInterceptor(authFunc AuthFunc) grpc.UnaryServerInterceptor {
}

// StreamServerInterceptor returns a new unary server interceptors that performs per-request auth.
// NOTE(bwplotka): For more complex auth interceptor see https://github.com/grpc/grpc-go/blob/master/authz/grpc_authz_server_interceptors.go.
func StreamServerInterceptor(authFunc AuthFunc) grpc.StreamServerInterceptor {
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
var newCtx context.Context
Expand Down
9 changes: 5 additions & 4 deletions interceptors/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
package retry

import (
"context"
"math/rand"
"time"
)

// BackoffLinear is very simple: it waits for a fixed period of time between calls.
func BackoffLinear(waitBetween time.Duration) BackoffFunc {
return func(attempt uint) time.Duration {
return func(ctx context.Context, attempt uint) time.Duration {
return waitBetween
}
}
Expand All @@ -31,7 +32,7 @@ func exponentBase2(a uint) uint {
// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc {
return func(attempt uint) time.Duration {
return func(ctx context.Context, attempt uint) time.Duration {
return jitterUp(waitBetween, jitterFraction)
}
}
Expand All @@ -40,15 +41,15 @@ func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64)
// The scalar is multiplied times 2 raised to the current attempt. So the first
// retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.
func BackoffExponential(scalar time.Duration) BackoffFunc {
return func(attempt uint) time.Duration {
return func(ctx context.Context, attempt uint) time.Duration {
return scalar * time.Duration(exponentBase2(attempt))
}
}

// BackoffExponentialWithJitter creates an exponential backoff like
// BackoffExponential does, but adds jitter.
func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc {
return func(attempt uint) time.Duration {
return func(ctx context.Context, attempt uint) time.Duration {
return jitterUp(scalar*time.Duration(exponentBase2(attempt)), jitterFraction)
}
}
25 changes: 3 additions & 22 deletions interceptors/retry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ var (
perCallTimeout: 0, // disabled
includeHeader: true,
codes: DefaultRetriableCodes,
backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
}),
backoffFunc: BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
onRetryCallback: OnRetryCallback(func(ctx context.Context, attempt uint, err error) {
logTrace(ctx, "grpc_retry attempt: %d, backoff for %v", attempt, err)
}),
Expand All @@ -37,16 +35,8 @@ var (
// They are called with an identifier of the attempt, and should return a time the system client should
// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
// the deadline of the request takes precedence and the wait will be interrupted before proceeding
// with the next iteration.
type BackoffFunc func(attempt uint) time.Duration

// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
//
// They are called with an identifier of the attempt, and should return a time the system client should
// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
// the deadline of the request takes precedence and the wait will be interrupted before proceeding
// with the next iteration. The context can be used to extract request scoped metadata and context values.
type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration
type BackoffFunc func(ctx context.Context, attempt uint) time.Duration

// OnRetryCallback is the type of function called when a retry occurs.
type OnRetryCallback func(ctx context.Context, attempt uint, err error)
Expand All @@ -67,15 +57,6 @@ func WithMax(maxRetries uint) CallOption {

// WithBackoff sets the `BackoffFunc` used to control time between retries.
func WithBackoff(bf BackoffFunc) CallOption {
return CallOption{applyFunc: func(o *options) {
o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return bf(attempt)
})
}}
}

// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries.
func WithBackoffContext(bf BackoffFuncContext) CallOption {
return CallOption{applyFunc: func(o *options) {
o.backoffFunc = bf
}}
Expand Down Expand Up @@ -124,7 +105,7 @@ type options struct {
perCallTimeout time.Duration
includeHeader bool
codes []codes.Code
backoffFunc BackoffFuncContext
backoffFunc BackoffFunc
onRetryCallback OnRetryCallback
}

Expand Down
69 changes: 0 additions & 69 deletions interceptors/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,40 +178,6 @@ func (s *RetrySuite) TestUnary_OverrideFromDialOpts() {
require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
}

func (s *RetrySuite) TestUnary_PerCallDeadline_Succeeds() {
s.T().Skip("TODO(bwplotka): Mock time & unskip, this is too flaky on GH Actions.")

// This tests 5 requests, with first 4 sleeping for 10 millisecond, and the retry logic firing
// a retry call with a 5 millisecond deadline. The 5th one doesn't sleep and succeeds.
deadlinePerCall := 5 * time.Millisecond
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "the fifth invocation should succeed")
require.NotNil(s.T(), out, "Pong must be not nil")
require.EqualValues(s.T(), 5, s.srv.requestCount(), "five requests should have been made")
}

func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
s.T().Skip("TODO(bwplotka): Mock time & unskip, this is too flaky on GH Actions.")

// This tests that the parent context (passed to the invocation) takes precedence over retries.
// The parent context has 150 milliseconds of deadline.
// Each failed call sleeps for 100milliseconds, and there is 5 milliseconds between each one.
// This means that unlike in TestUnary_PerCallDeadline_Succeeds, the fifth successful call won't
// be made.
parentDeadline := 150 * time.Millisecond
deadlinePerCall := 50 * time.Millisecond
// All 0-4 requests should have 10 millisecond sleeps and deadline, while the last one works.
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
ctx, cancel := context.WithTimeout(context.TODO(), parentDeadline)
defer cancel()
_, err := s.Client.Ping(ctx, testpb.GoodPing, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.Error(s.T(), err, "the retries must fail due to context deadline exceeded")
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
}

func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() {
retryCallbackCount := 0

Expand Down Expand Up @@ -243,41 +209,6 @@ func (s *RetrySuite) TestServerStream_OverrideFromContext() {
require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
}

func (s *RetrySuite) TestServerStream_PerCallDeadline_Succeeds() {
s.T().Skip("TODO(bwplotka): Mock time & unskip, this is too flaky on GH Actions.")

// This tests 5 requests, with first 4 sleeping for 100 millisecond, and the retry logic firing
// a retry call with a 50 millisecond deadline. The 5th one doesn't sleep and succeeds.
deadlinePerCall := 100 * time.Millisecond
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
s.assertPingListWasCorrect(stream)
require.EqualValues(s.T(), 5, s.srv.requestCount(), "three requests should have been made")
}

func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
s.T().Skip("TODO(bwplotka): Mock time & unskip, this is too flaky on GH Actions.")

// This tests that the parent context (passed to the invocation) takes precedence over retries.
// The parent context has 150 milliseconds of deadline.
// Each failed call sleeps for 50milliseconds, and there is 25 milliseconds between each one.
// This means that unlike in TestServerStream_PerCallDeadline_Succeeds, the fifth successful call won't
// be made.
parentDeadline := 150 * time.Millisecond
deadlinePerCall := 50 * time.Millisecond
// All 0-4 requests should have 10 millisecond sleeps and deadline, while the last one works.
s.srv.resetFailingConfiguration(5, codes.NotFound, 2*deadlinePerCall)
parentCtx, cancel := context.WithTimeout(context.TODO(), parentDeadline)
defer cancel()
stream, err := s.Client.PingList(parentCtx, testpb.GoodPingList, WithPerRetryTimeout(deadlinePerCall),
WithMax(5))
require.NoError(s.T(), err, "establishing the connection must always succeed")
_, err = stream.Recv()
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
}

func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() {
retryCallbackCount := 0

Expand Down
6 changes: 3 additions & 3 deletions interceptors/validator/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateOpts(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if err := validate(ctx, req, o.shouldFailFast, o.onValidationErrFunc); err != nil {
if err := validate(ctx, req, o.shouldFailFast, o.onValidationErrCallback); err != nil {
return nil, err
}
return handler(ctx, req)
Expand All @@ -32,7 +32,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
o := evaluateOpts(opts)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if err := validate(ctx, req, o.shouldFailFast, o.onValidationErrFunc); err != nil {
if err := validate(ctx, req, o.shouldFailFast, o.onValidationErrCallback); err != nil {
return err
}
return invoker(ctx, method, req, reply, cc, opts...)
Expand Down Expand Up @@ -68,7 +68,7 @@ func (s *recvWrapper) RecvMsg(m any) error {
if err := s.ServerStream.RecvMsg(m); err != nil {
return err
}
if err := validate(s.Context(), m, s.shouldFailFast, s.onValidationErrFunc); err != nil {
if err := validate(s.Context(), m, s.shouldFailFast, s.onValidationErrCallback); err != nil {
return err
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions interceptors/validator/interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func TestValidatorTestSuite(t *testing.T) {
sWithOnErrFuncArgs := &ValidatorTestSuite{
InterceptorTestSuite: &testpb.InterceptorTestSuite{
ServerOpts: []grpc.ServerOption{
grpc.StreamInterceptor(validator.StreamServerInterceptor(validator.WithOnValidationErrFunc(onErr))),
grpc.UnaryInterceptor(validator.UnaryServerInterceptor(validator.WithOnValidationErrFunc(onErr))),
grpc.StreamInterceptor(validator.StreamServerInterceptor(validator.WithOnValidationErrCallback(onErr))),
grpc.UnaryInterceptor(validator.UnaryServerInterceptor(validator.WithOnValidationErrCallback(onErr))),
},
},
}
Expand All @@ -128,8 +128,8 @@ func TestValidatorTestSuite(t *testing.T) {
sAll := &ValidatorTestSuite{
InterceptorTestSuite: &testpb.InterceptorTestSuite{
ServerOpts: []grpc.ServerOption{
grpc.StreamInterceptor(validator.StreamServerInterceptor(validator.WithFailFast(), validator.WithOnValidationErrFunc(onErr))),
grpc.UnaryInterceptor(validator.UnaryServerInterceptor(validator.WithFailFast(), validator.WithOnValidationErrFunc(onErr))),
grpc.StreamInterceptor(validator.StreamServerInterceptor(validator.WithFailFast(), validator.WithOnValidationErrCallback(onErr))),
grpc.UnaryInterceptor(validator.UnaryServerInterceptor(validator.WithFailFast(), validator.WithOnValidationErrCallback(onErr))),
},
},
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestValidatorTestSuite(t *testing.T) {
csWithOnErrFuncArgs := &ClientValidatorTestSuite{
InterceptorTestSuite: &testpb.InterceptorTestSuite{
ServerOpts: []grpc.ServerOption{
grpc.UnaryInterceptor(validator.UnaryServerInterceptor(validator.WithOnValidationErrFunc(onErr))),
grpc.UnaryInterceptor(validator.UnaryServerInterceptor(validator.WithOnValidationErrCallback(onErr))),
},
},
}
Expand All @@ -169,7 +169,7 @@ func TestValidatorTestSuite(t *testing.T) {
csAll := &ClientValidatorTestSuite{
InterceptorTestSuite: &testpb.InterceptorTestSuite{
ClientOpts: []grpc.DialOption{
grpc.WithUnaryInterceptor(validator.UnaryClientInterceptor(validator.WithFailFast(), validator.WithOnValidationErrFunc(onErr))),
grpc.WithUnaryInterceptor(validator.UnaryClientInterceptor(validator.WithFailFast(), validator.WithOnValidationErrCallback(onErr))),
},
},
}
Expand Down
12 changes: 6 additions & 6 deletions interceptors/validator/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

type options struct {
shouldFailFast bool
onValidationErrFunc OnValidationErr
shouldFailFast bool
onValidationErrCallback OnValidationErrCallback
}
type Option func(*options)

Expand All @@ -21,12 +21,12 @@ func evaluateOpts(opts []Option) *options {
return optCopy
}

type OnValidationErr func(ctx context.Context, err error)
type OnValidationErrCallback func(ctx context.Context, err error)

// WithOnValidationErrFunc registers function that will be invoked on validation error(s).
func WithOnValidationErrFunc(onValidationErrFunc OnValidationErr) Option {
// WithOnValidationErrCallback registers function that will be invoked on validation error(s).
func WithOnValidationErrCallback(onValidationErrCallback OnValidationErrCallback) Option {
return func(o *options) {
o.onValidationErrFunc = onValidationErrFunc
o.onValidationErrCallback = onValidationErrCallback
}
}

Expand Down
6 changes: 3 additions & 3 deletions interceptors/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type validatorLegacy interface {
Validate() error
}

func validate(ctx context.Context, reqOrRes interface{}, shouldFailFast bool, onValidationErrFunc OnValidationErr) (err error) {
func validate(ctx context.Context, reqOrRes interface{}, shouldFailFast bool, onValidationErrCallback OnValidationErrCallback) (err error) {
if shouldFailFast {
switch v := reqOrRes.(type) {
case validatorLegacy:
Expand All @@ -50,8 +50,8 @@ func validate(ctx context.Context, reqOrRes interface{}, shouldFailFast bool, on
return nil
}

if onValidationErrFunc != nil {
onValidationErrFunc(ctx, err)
if onValidationErrCallback != nil {
onValidationErrCallback(ctx, err)
}
return status.Error(codes.InvalidArgument, err.Error())
}

0 comments on commit 5ca0c41

Please sign in to comment.