-
Notifications
You must be signed in to change notification settings - Fork 521
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Correctly cancel GRPC context beneath the HTTP server (#3443)
* cancel context Signed-off-by: Joe Elliott <number101010@gmail.com> * update dskit Signed-off-by: Joe Elliott <number101010@gmail.com> * focused timeouts Signed-off-by: Joe Elliott <number101010@gmail.com> * docs Signed-off-by: Joe Elliott <number101010@gmail.com> * lint N docs Signed-off-by: Joe Elliott <number101010@gmail.com> * more lint Signed-off-by: Joe Elliott <number101010@gmail.com> * make update-mod Signed-off-by: Joe Elliott <number101010@gmail.com> --------- Signed-off-by: Joe Elliott <number101010@gmail.com>
- Loading branch information
1 parent
ae083c3
commit eb81b92
Showing
46 changed files
with
4,313 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package interceptor | ||
|
||
import ( | ||
"context" | ||
"strings" | ||
"time" | ||
|
||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
const streamingQuerierPrefix = "/tempopb.StreamingQuerier/" | ||
|
||
func NewFrontendAPIUnaryTimeout(timeout time.Duration) grpc.UnaryServerInterceptor { | ||
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { | ||
if strings.HasPrefix(info.FullMethod, streamingQuerierPrefix) { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, timeout) | ||
defer cancel() | ||
} | ||
|
||
return handler(ctx, req) | ||
} | ||
} | ||
|
||
func NewFrontendAPIStreamTimeout(timeout time.Duration) grpc.StreamServerInterceptor { | ||
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { | ||
ctx := ss.Context() | ||
if strings.HasPrefix(info.FullMethod, streamingQuerierPrefix) { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ss.Context(), timeout) | ||
defer cancel() | ||
} | ||
|
||
return handler(srv, &grpc_middleware.WrappedServerStream{ | ||
ServerStream: ss, | ||
WrappedContext: ctx, | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package interceptor | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"testing" | ||
"time" | ||
|
||
"github.com/grafana/tempo/pkg/gogocodec" | ||
"github.com/grafana/tempo/pkg/tempopb" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/encoding" | ||
) | ||
|
||
func TestInterceptorsCancelContextForStreaming(t *testing.T) { | ||
encoding.RegisterCodec(gogocodec.NewCodec()) | ||
|
||
interceptorTimeout := time.Second | ||
apiTimeout := time.Second * 5 | ||
|
||
unaryInt := NewFrontendAPIUnaryTimeout(interceptorTimeout) | ||
streamInt := NewFrontendAPIStreamTimeout(interceptorTimeout) | ||
|
||
serv := grpc.NewServer(grpc.UnaryInterceptor(unaryInt), grpc.StreamInterceptor(streamInt)) | ||
defer serv.GracefulStop() | ||
|
||
srv := &mockService{apiTimeout} | ||
tempopb.RegisterStreamingQuerierServer(serv, srv) | ||
tempopb.RegisterPusherServer(serv, srv) | ||
|
||
listener, err := net.Listen("tcp", "localhost:0") | ||
require.NoError(t, err) | ||
|
||
go func() { | ||
require.NoError(t, serv.Serve(listener)) | ||
}() | ||
|
||
conn, err := grpc.Dial(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(10e6), grpc.MaxCallSendMsgSize(10e6))) | ||
require.NoError(t, err) | ||
defer func() { | ||
require.NoError(t, conn.Close()) | ||
}() | ||
|
||
// test that a streaming client has its context cancelled after the interceptor timeout and before the api timeout | ||
c := tempopb.NewStreamingQuerierClient(conn) | ||
client, err := c.Search(context.Background(), &tempopb.SearchRequest{}) | ||
require.NoError(t, err) | ||
|
||
start := time.Now() | ||
_, err = client.Recv() | ||
require.EqualError(t, err, "rpc error: code = DeadlineExceeded desc = context deadline exceeded") | ||
require.LessOrEqual(t, time.Since(start), apiTimeout) // confirm that we didn't wait for the full api timeout | ||
|
||
// test that the pusher client does not have its context cancelled and waits for the full api timeout | ||
pc := tempopb.NewPusherClient(conn) | ||
|
||
start = time.Now() | ||
_, err = pc.PushBytesV2(context.Background(), &tempopb.PushBytesRequest{}) | ||
require.NoError(t, err) | ||
require.GreaterOrEqual(t, time.Since(start), apiTimeout) // confirm that we did wait for the full api timeout | ||
} | ||
|
||
type mockService struct { | ||
apiTimeout time.Duration | ||
} | ||
|
||
func (s *mockService) Search(_ *tempopb.SearchRequest, ss tempopb.StreamingQuerier_SearchServer) error { | ||
select { | ||
case <-time.After(s.apiTimeout): | ||
case <-ss.Context().Done(): | ||
return ss.Context().Err() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *mockService) PushBytes(ctx context.Context, _ *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { | ||
select { | ||
case <-time.After(s.apiTimeout): | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
|
||
return &tempopb.PushResponse{}, nil | ||
} | ||
|
||
func (s *mockService) PushBytesV2(ctx context.Context, _ *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { | ||
select { | ||
case <-time.After(s.apiTimeout): | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
|
||
return &tempopb.PushResponse{}, nil | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.