Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r310] query-frontend: Do not break scheduler connection on malformed queries #9791

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ const (

type enqueueResult struct {
status enqueueStatus
// If the status is failed and if it was because of a client error on the frontend,
// the clientErr should be updated with the appropriate error.
clientErr error

cancelCh chan<- uint64 // Channel that can be used for request cancellation. If nil, cancellation is not possible.
}
Expand Down Expand Up @@ -285,6 +288,11 @@ enqueueAgain:
cancelCh = enqRes.cancelCh
break // go wait for response.
} else if enqRes.status == failed {
if enqRes.clientErr != nil {
// It failed because of a client error. No need to retry.
return nil, nil, httpgrpc.Errorf(http.StatusBadRequest, "failed to enqueue request: %s", enqRes.clientErr.Error())
}

retries--
if retries > 0 {
spanLogger.DebugLog("msg", "enqueuing request failed, will retry")
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func (w *frontendSchedulerWorker) enqueueRequest(loop schedulerpb.SchedulerForFr
frontendToSchedulerRequest, err := w.toSchedulerAdapter.frontendToSchedulerEnqueueRequest(req, w.frontendAddr)
if err != nil {
level.Warn(spanLogger).Log("msg", "error converting frontend request to scheduler request", "err", err)
req.enqueue <- enqueueResult{status: failed}
return err
req.enqueue <- enqueueResult{status: failed, clientErr: err}
return nil
}

err = loop.Send(frontendToSchedulerRequest)
Expand Down
60 changes: 51 additions & 9 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,59 @@ func TestFrontendTooManyRequests(t *testing.T) {
require.Equal(t, int32(http.StatusTooManyRequests), resp.Code)
}

func TestFrontendEnqueueFailure(t *testing.T) {
f, _ := setupFrontend(t, nil, func(*Frontend, *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
func TestFrontendEnqueueFailures(t *testing.T) {
t.Run("scheduler is shutting down with valid query", func(t *testing.T) {
f, _ := setupFrontend(t, nil, func(*Frontend, *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
})

req := &httpgrpc.HTTPRequest{
Url: "/api/v1/query_range?start=946684800&end=946771200&step=60&query=up",
}
_, _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), req)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
})
t.Run("scheduler is running fine", func(t *testing.T) {
f, _ := setupFrontend(t, nil, func(*Frontend, *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})

req := &httpgrpc.HTTPRequest{
Url: "/api/v1/query_range?start=946684800&end=946771200&step=60",
}
_, _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), req)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
cases := []struct {
name, url, error string
}{
{
name: "start time is wrong",
url: "/api/v1/query_range?start=9466camnsd84800&end=946771200&step=60&query=up{}",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "start": cannot parse "9466camnsd84800" to a valid timestamp`,
},
{
name: "end time is wrong",
url: "/api/v1/query_range?start=946684800&end=946771200dgiu&step=60&query=up{}",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "end": cannot parse "946771200dgiu" to a valid timestamp`,
},
{
name: "query time is wrong",
url: "/api/v1/query_range?start=946684800&end=946771200&step=60&query=up{",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "query": 1:4: parse error: unexpected end of input inside braces`,
},
{
name: "no query provided",
url: "/api/v1/query_range?start=946684800&end=946771200&step=60",
error: `rpc error: code = Code(400) desc = failed to enqueue request: invalid parameter "query": unknown position: parse error: no expression found in input`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
req := &httpgrpc.HTTPRequest{
Url: c.url,
}
_, _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), req)
require.Error(t, err)
require.Equal(t, c.error, err.Error())
})
}
})
}

func TestFrontendCancellation(t *testing.T) {
Expand Down
Loading