Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: iterator cursor progress too fast(#36179) #36180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ message RetrieveRequest {
bool is_count = 13;
int64 iteration_extension_reduce_rate = 14;
string username = 15;
bool reduce_stop_for_best = 16;
bool reduce_stop_for_best = 16; //deprecated
int32 reduce_type = 17;
}



message RetrieveResults {
common.MsgBase base = 1;
common.Status status = 2;
Expand Down
49 changes: 37 additions & 12 deletions internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/exprutil"
"github.com/milvus-io/milvus/internal/util/reduce"
typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -75,9 +76,9 @@
}

type queryParams struct {
limit int64
offset int64
reduceStopForBest bool
limit int64
offset int64
reduceType reduce.IReduceType
}

// translateToOutputFieldIDs translates output fields name to output fields id.
Expand Down Expand Up @@ -142,6 +143,7 @@
limit int64
offset int64
reduceStopForBest bool
isIterator bool
err error
)
reduceStopForBestStr, err := funcutil.GetAttrByKeyFromRepeatedKV(ReduceStopForBestKey, queryParamsPair)
Expand All @@ -154,10 +156,29 @@
}
}

isIteratorStr, err := funcutil.GetAttrByKeyFromRepeatedKV(IteratorField, queryParamsPair)
// if reduce_stop_for_best is provided
if err == nil {
isIterator, err = strconv.ParseBool(isIteratorStr)
if err != nil {
return nil, merr.WrapErrParameterInvalid("true or false", isIteratorStr,
"value for iterator field is invalid")
}
}

reduceType := reduce.IReduceNoOrder
if isIterator {
if reduceStopForBest {
reduceType = reduce.IReduceInOrderForBest
} else {
reduceType = reduce.IReduceInOrder
}
}

limitStr, err := funcutil.GetAttrByKeyFromRepeatedKV(LimitKey, queryParamsPair)
// if limit is not provided
if err != nil {
return &queryParams{limit: typeutil.Unlimited, reduceStopForBest: reduceStopForBest}, nil
return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType}, nil
}
limit, err = strconv.ParseInt(limitStr, 0, 64)
if err != nil {
Expand All @@ -179,9 +200,9 @@
}

return &queryParams{
limit: limit,
offset: offset,
reduceStopForBest: reduceStopForBest,
limit: limit,
offset: offset,
reduceType: reduceType,
}, nil
}

Expand Down Expand Up @@ -343,7 +364,10 @@
if err != nil {
return err
}
t.RetrieveRequest.ReduceStopForBest = queryParams.reduceStopForBest
if queryParams.reduceType == reduce.IReduceInOrderForBest {
t.RetrieveRequest.ReduceStopForBest = true

Check warning on line 368 in internal/proxy/task_query.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_query.go#L368

Added line #L368 was not covered by tests
}
t.RetrieveRequest.ReduceType = int32(queryParams.reduceType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we shall fill both field value for backward compatibility concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compatibility in the process of rolling upgrade?
this may only result in the number of results not than 'many'


t.queryParams = queryParams
t.RetrieveRequest.Limit = queryParams.limit + queryParams.offset
Expand Down Expand Up @@ -615,9 +639,10 @@
cursors := make([]int64, len(validRetrieveResults))

if queryParams != nil && queryParams.limit != typeutil.Unlimited {
// reduceStopForBest will try to get as many results as possible
// IReduceInOrderForBest will try to get as many results as possible
// so loopEnd in this case will be set to the sum of all results' size
if !queryParams.reduceStopForBest {
// to get as many qualified results as possible
if reduce.ShouldUseInputLimit(queryParams.reduceType) {
loopEnd = int(queryParams.limit)
}
}
Expand All @@ -626,7 +651,7 @@
if queryParams != nil && queryParams.offset > 0 {
for i := int64(0); i < queryParams.offset; i++ {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(queryParams.reduceType) && drainOneResult) {
return ret, nil
}
cursors[sel]++
Expand All @@ -638,7 +663,7 @@
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(queryParams.reduceType) && drainOneResult) {
break
}

Expand Down
116 changes: 111 additions & 5 deletions internal/proxy/task_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/reduce"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -442,6 +443,101 @@ func TestTaskQuery_functions(t *testing.T) {
}
})

t.Run("test parseQueryParams for reduce type", func(t *testing.T) {
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "True",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceInOrderForBest, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "True",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "TrueXXXX",
})
ret, err := parseQueryParams(inParams)
assert.Error(t, err)
assert.Nil(t, ret)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "TrueXXXXX",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
ret, err := parseQueryParams(inParams)
assert.Error(t, err)
assert.Nil(t, ret)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "True",
})
// when not setting iterator tag, ignore reduce_stop_for_best
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceNoOrder, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
// when not setting reduce_stop_for_best tag, reduce by keep results in order
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceInOrder, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "False",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceInOrder, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "False",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "False",
})
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceNoOrder, ret.reduceType)
}
})

t.Run("test reduceRetrieveResults", func(t *testing.T) {
const (
Dim = 8
Expand Down Expand Up @@ -600,7 +696,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = false
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 2, reduceStopForBest: true})
&queryParams{limit: 2, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 11, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
Expand All @@ -613,7 +709,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = true
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 1, offset: 1, reduceStopForBest: true})
&queryParams{limit: 1, offset: 1, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
Expand All @@ -624,7 +720,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = true
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 2, offset: 1, reduceStopForBest: true})
&queryParams{limit: 2, offset: 1, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))

Expand All @@ -637,7 +733,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = false
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: typeutil.Unlimited, reduceStopForBest: true})
&queryParams{limit: typeutil.Unlimited, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
Expand All @@ -648,11 +744,21 @@ func TestTaskQuery_functions(t *testing.T) {
t.Run("test stop reduce for best for unlimited set amd offset", func(t *testing.T) {
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceStopForBest: true})
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
t.Run("test iterator without setting reduce stop for best", func(t *testing.T) {
r1.HasMoreResult = true
r2.HasMoreResult = true
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 1, reduceType: reduce.IReduceInOrder})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
})
})
}
Expand Down
23 changes: 12 additions & 11 deletions internal/querynodev2/segments/default_limit_reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/util/reduce"
)

type defaultLimitReducer struct {
Expand All @@ -15,24 +16,24 @@ type defaultLimitReducer struct {
}

type mergeParam struct {
limit int64
outputFieldsId []int64
schema *schemapb.CollectionSchema
mergeStopForBest bool
limit int64
outputFieldsId []int64
schema *schemapb.CollectionSchema
reduceType reduce.IReduceType
}

func NewMergeParam(limit int64, outputFieldsId []int64, schema *schemapb.CollectionSchema, reduceStopForBest bool) *mergeParam {
func NewMergeParam(limit int64, outputFieldsId []int64, schema *schemapb.CollectionSchema, reduceType reduce.IReduceType) *mergeParam {
return &mergeParam{
limit: limit,
outputFieldsId: outputFieldsId,
schema: schema,
mergeStopForBest: reduceStopForBest,
limit: limit,
outputFieldsId: outputFieldsId,
schema: schema,
reduceType: reduceType,
}
}

func (r *defaultLimitReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveResults) (*internalpb.RetrieveResults, error) {
reduceParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(),
r.schema, r.req.GetReq().GetReduceStopForBest())
r.schema, reduce.ToReduceType(r.req.GetReq().GetReduceType()))
return mergeInternalRetrieveResultsAndFillIfEmpty(ctx, results, reduceParam)
}

Expand All @@ -50,7 +51,7 @@ type defaultLimitReducerSegcore struct {
}

func (r *defaultLimitReducerSegcore) Reduce(ctx context.Context, results []*segcorepb.RetrieveResults, segments []Segment, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
mergeParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(), r.schema, r.req.GetReq().GetReduceStopForBest())
mergeParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(), r.schema, reduce.ToReduceType(r.req.GetReq().GetReduceType()))
return mergeSegcoreRetrieveResultsAndFillIfEmpty(ctx, results, mergeParam, segments, plan, r.manager)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/querynodev2/segments/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
return ret, nil
}

if param.limit != typeutil.Unlimited && !param.mergeStopForBest {
if param.limit != typeutil.Unlimited && reduce.ShouldUseInputLimit(param.reduceType) {
loopEnd = int(param.limit)
}

Expand All @@ -305,7 +305,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; {
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(param.reduceType) && drainOneResult) {
break
}

Expand Down Expand Up @@ -416,7 +416,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
}

var limit int = -1
if param.limit != typeutil.Unlimited && !param.mergeStopForBest {
if param.limit != typeutil.Unlimited && reduce.ShouldUseInputLimit(param.reduceType) {
limit = int(param.limit)
}

Expand All @@ -438,7 +438,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore

for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ {
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(param.reduceType) && drainOneResult) {
break
}

Expand Down
Loading
Loading