Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support retry on milvus errors and retry on ErrInconsistentRequery #675

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions client/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ func (c *GrpcClient) HybridSearch(ctx context.Context, collName string, partitio
RankParams: params,
}

result, err := c.Service.HybridSearch(ctx, req)

err = merr.CheckRPCCall(result, err)
r, err := RetryOnMilvusErrors(ctx, func() (interface{}, error) {
return c.Service.HybridSearch(ctx, req)
}, OnMerrCodes(merr.Code(merr.ErrInconsistentRequery)))
if err != nil {
return nil, err
}
result := r.(*milvuspb.SearchResults)
if err = handleRespStatus(result.GetStatus()); err != nil {
return nil, err
}

return c.handleSearchResult(schema, outputFields, nq, result)
}
Expand Down Expand Up @@ -117,10 +121,13 @@ func (c *GrpcClient) Search(ctx context.Context, collName string, partitions []s
return nil, err
}

resp, err := c.Service.Search(ctx, req)
r, err := RetryOnMilvusErrors(ctx, func() (interface{}, error) {
return c.Service.Search(ctx, req)
}, OnMerrCodes(merr.Code(merr.ErrInconsistentRequery)))
if err != nil {
return nil, err
}
resp := r.(*milvuspb.SearchResults)
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, err
}
Expand Down
141 changes: 141 additions & 0 deletions client/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)

type config struct {
attempts uint
sleep time.Duration
maxSleepTime time.Duration
merrCodes []int32
}

func newDefaultConfig() *config {
return &config{
attempts: uint(10),
sleep: 200 * time.Millisecond,
maxSleepTime: 3 * time.Second,
}
}

// Option is used to config the retry function.
type Option func(*config)

// Attempts is used to config the max retry times.
func Attempts(attempts uint) Option {
return func(c *config) {
c.attempts = attempts
}
}

// Sleep is used to config the initial interval time of each execution.
func Sleep(sleep time.Duration) Option {
return func(c *config) {
c.sleep = sleep
// ensure max retry interval is always larger than retry interval
if c.sleep*2 > c.maxSleepTime {
c.maxSleepTime = 2 * c.sleep
}
}
}

// MaxSleepTime is used to config the max interval time of each execution.
func MaxSleepTime(maxSleepTime time.Duration) Option {
return func(c *config) {
// ensure max retry interval is always larger than retry interval
if c.sleep*2 > maxSleepTime {
c.maxSleepTime = 2 * c.sleep
} else {
c.maxSleepTime = maxSleepTime
}
}
}

func OnMerrCodes(codes ...int32) Option {
return func(c *config) {
c.merrCodes = append(c.merrCodes, codes...)
}
}

func contains(codes []int32, target int32) bool {
for _, c := range codes {
if c == target {
return true
}
}
return false
}

func RetryOnMilvusErrors(ctx context.Context, fn func() (interface{}, error), opts ...Option) (interface{}, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

c := newDefaultConfig()
for _, opt := range opts {
opt(c)
}

if len(c.merrCodes) == 0 {
return fn()
}

var lastResp interface{}
for i := uint(0); i < c.attempts; i++ {
resp, err := fn()
if err != nil {
return resp, err
}
var code int32
switch r := resp.(type) {
case interface{ GetStatus() *commonpb.Status }:
code = r.GetStatus().GetCode()
case interface{ GetCode() int32 }:
code = r.GetCode()
default:
return resp, nil
}
if code == 0 || !contains(c.merrCodes, code) {
return resp, nil
}

deadline, ok := ctx.Deadline()
if ok && time.Until(deadline) < c.sleep {
return resp, context.DeadlineExceeded
}

lastResp = resp

select {
case <-time.After(c.sleep):
case <-ctx.Done():
return lastResp, ctx.Err()
}

c.sleep *= 2
if c.sleep > c.maxSleepTime {
c.sleep = c.maxSleepTime
}
}
return lastResp, nil
}
180 changes: 180 additions & 0 deletions client/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)

func TestRetryOnMilvusErrors(t *testing.T) {
ctx := context.Background()

n := 0
testFn := func() (interface{}, error) {
if n < 3 {
n++
return &commonpb.Status{
Code: 0,
}, nil
}
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100))
assert.Equal(t, int32(0), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestOnNoCode(t *testing.T) {
ctx := context.Background()

n := 0
testFn := func() (interface{}, error) {
if n < 3 {
n++
return &commonpb.Status{
Code: 0,
}, nil
}
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn)
assert.Equal(t, int32(0), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestReturnErr(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
return nil, errors.New("mock err")
}

_, err := RetryOnMilvusErrors(ctx, testFn)
assert.Error(t, err)
t.Log(err)
}

func TestAttempts(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100), Attempts(1))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestMaxSleepTime(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100), Attempts(3), MaxSleepTime(200*time.Millisecond))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestSleep(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100), Attempts(3), Sleep(500*time.Millisecond))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestContextDeadline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.Error(t, err)
assert.ErrorIs(t, err, context.DeadlineExceeded)
t.Log(resp)
}

func TestContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.Error(t, err)
assert.ErrorIs(t, err, context.Canceled)
t.Log(resp)
t.Log(err)
}
3 changes: 3 additions & 0 deletions merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ var (

// import
ErrImportFailed = newMilvusError("importing data failed", 2100, false)

// Search/Query related
ErrInconsistentRequery = newMilvusError("inconsistent requery result", 2200, true)
)

type milvusError struct {
Expand Down
Loading
Loading