Skip to content

Commit

Permalink
fix: iterator cursor progress too fast(#36179)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <chun.han@gmail.com>
  • Loading branch information
MrPresent-Han committed Sep 14, 2024
1 parent e7ea1d7 commit 37e895b
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 54 deletions.
4 changes: 2 additions & 2 deletions internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ message RetrieveRequest {
bool is_count = 13;
int64 iteration_extension_reduce_rate = 14;
string username = 15;
bool reduce_stop_for_best = 16;
bool reduce_stop_for_best = 16; //deprecated
int32 reduce_type = 17;
}



message RetrieveResults {
common.MsgBase base = 1;
common.Status status = 2;
Expand Down
49 changes: 37 additions & 12 deletions internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/exprutil"
"github.com/milvus-io/milvus/internal/util/reduce"
typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -75,9 +76,9 @@ type queryTask struct {
}

type queryParams struct {
limit int64
offset int64
reduceStopForBest bool
limit int64
offset int64
reduceType reduce.IReduceType
}

// translateToOutputFieldIDs translates output fields name to output fields id.
Expand Down Expand Up @@ -142,6 +143,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
limit int64
offset int64
reduceStopForBest bool
isIterator bool
err error
)
reduceStopForBestStr, err := funcutil.GetAttrByKeyFromRepeatedKV(ReduceStopForBestKey, queryParamsPair)
Expand All @@ -154,10 +156,29 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
}
}

isIteratorStr, err := funcutil.GetAttrByKeyFromRepeatedKV(IteratorField, queryParamsPair)
// if reduce_stop_for_best is provided
if err == nil {
isIterator, err = strconv.ParseBool(isIteratorStr)
if err != nil {
return nil, merr.WrapErrParameterInvalid("true or false", isIteratorStr,
"value for iterator field is invalid")
}
}

reduceType := reduce.IReduceNoOrder
if isIterator {
if reduceStopForBest {
reduceType = reduce.IReduceInOrderForBest
} else {
reduceType = reduce.IReduceInOrder
}
}

limitStr, err := funcutil.GetAttrByKeyFromRepeatedKV(LimitKey, queryParamsPair)
// if limit is not provided
if err != nil {
return &queryParams{limit: typeutil.Unlimited, reduceStopForBest: reduceStopForBest}, nil
return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType}, nil
}
limit, err = strconv.ParseInt(limitStr, 0, 64)
if err != nil {
Expand All @@ -179,9 +200,9 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
}

return &queryParams{
limit: limit,
offset: offset,
reduceStopForBest: reduceStopForBest,
limit: limit,
offset: offset,
reduceType: reduceType,
}, nil
}

Expand Down Expand Up @@ -343,7 +364,10 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
if err != nil {
return err
}
t.RetrieveRequest.ReduceStopForBest = queryParams.reduceStopForBest
if queryParams.reduceType == reduce.IReduceInOrderForBest {
t.RetrieveRequest.ReduceStopForBest = true
}
t.RetrieveRequest.ReduceType = int32(queryParams.reduceType)

t.queryParams = queryParams
t.RetrieveRequest.Limit = queryParams.limit + queryParams.offset
Expand Down Expand Up @@ -615,9 +639,10 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
cursors := make([]int64, len(validRetrieveResults))

if queryParams != nil && queryParams.limit != typeutil.Unlimited {
// reduceStopForBest will try to get as many results as possible
// IReduceInOrderForBest will try to get as many results as possible
// so loopEnd in this case will be set to the sum of all results' size
if !queryParams.reduceStopForBest {
// to get as many qualified results as possible
if reduce.ShouldUseInputLimit(queryParams.reduceType) {
loopEnd = int(queryParams.limit)
}
}
Expand All @@ -626,7 +651,7 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
if queryParams != nil && queryParams.offset > 0 {
for i := int64(0); i < queryParams.offset; i++ {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(queryParams.reduceType) && drainOneResult) {
return ret, nil
}
cursors[sel]++
Expand All @@ -638,7 +663,7 @@ func reduceRetrieveResults(ctx context.Context, retrieveResults []*internalpb.Re
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; {
sel, drainOneResult := typeutil.SelectMinPK(validRetrieveResults, cursors)
if sel == -1 || (queryParams.reduceStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(queryParams.reduceType) && drainOneResult) {
break
}

Expand Down
116 changes: 111 additions & 5 deletions internal/proxy/task_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/reduce"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -442,6 +443,101 @@ func TestTaskQuery_functions(t *testing.T) {
}
})

t.Run("test parseQueryParams for reduce type", func(t *testing.T) {
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "True",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceInOrderForBest, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "True",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "TrueXXXX",
})
ret, err := parseQueryParams(inParams)
assert.Error(t, err)
assert.Nil(t, ret)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "TrueXXXXX",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
ret, err := parseQueryParams(inParams)
assert.Error(t, err)
assert.Nil(t, ret)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "True",
})
// when not setting iterator tag, ignore reduce_stop_for_best
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceNoOrder, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
// when not setting reduce_stop_for_best tag, reduce by keep results in order
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceInOrder, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "False",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "True",
})
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceInOrder, ret.reduceType)
}
{
var inParams []*commonpb.KeyValuePair
inParams = append(inParams, &commonpb.KeyValuePair{
Key: ReduceStopForBestKey,
Value: "False",
})
inParams = append(inParams, &commonpb.KeyValuePair{
Key: IteratorField,
Value: "False",
})
ret, err := parseQueryParams(inParams)
assert.NoError(t, err)
assert.Equal(t, reduce.IReduceNoOrder, ret.reduceType)
}
})

t.Run("test reduceRetrieveResults", func(t *testing.T) {
const (
Dim = 8
Expand Down Expand Up @@ -600,7 +696,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = false
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 2, reduceStopForBest: true})
&queryParams{limit: 2, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 11, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
Expand All @@ -613,7 +709,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = true
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 1, offset: 1, reduceStopForBest: true})
&queryParams{limit: 1, offset: 1, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
Expand All @@ -624,7 +720,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = true
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 2, offset: 1, reduceStopForBest: true})
&queryParams{limit: 2, offset: 1, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))

Expand All @@ -637,7 +733,7 @@ func TestTaskQuery_functions(t *testing.T) {
r2.HasMoreResult = false
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: typeutil.Unlimited, reduceStopForBest: true})
&queryParams{limit: typeutil.Unlimited, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11, 11, 22, 22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
Expand All @@ -648,11 +744,21 @@ func TestTaskQuery_functions(t *testing.T) {
t.Run("test stop reduce for best for unlimited set amd offset", func(t *testing.T) {
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceStopForBest: true})
&queryParams{limit: typeutil.Unlimited, offset: 3, reduceType: reduce.IReduceInOrderForBest})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{22}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
t.Run("test iterator without setting reduce stop for best", func(t *testing.T) {
r1.HasMoreResult = true
r2.HasMoreResult = true
result, err := reduceRetrieveResults(context.Background(),
[]*internalpb.RetrieveResults{r1, r2},
&queryParams{limit: 1, reduceType: reduce.IReduceInOrder})
assert.NoError(t, err)
assert.Equal(t, 2, len(result.GetFieldsData()))
assert.Equal(t, []int64{11}, result.GetFieldsData()[0].GetScalars().GetLongData().Data)
})
})
})
}
Expand Down
23 changes: 12 additions & 11 deletions internal/querynodev2/segments/default_limit_reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/util/reduce"
)

type defaultLimitReducer struct {
Expand All @@ -15,24 +16,24 @@ type defaultLimitReducer struct {
}

type mergeParam struct {
limit int64
outputFieldsId []int64
schema *schemapb.CollectionSchema
mergeStopForBest bool
limit int64
outputFieldsId []int64
schema *schemapb.CollectionSchema
reduceType reduce.IReduceType
}

func NewMergeParam(limit int64, outputFieldsId []int64, schema *schemapb.CollectionSchema, reduceStopForBest bool) *mergeParam {
func NewMergeParam(limit int64, outputFieldsId []int64, schema *schemapb.CollectionSchema, reduceType reduce.IReduceType) *mergeParam {
return &mergeParam{
limit: limit,
outputFieldsId: outputFieldsId,
schema: schema,
mergeStopForBest: reduceStopForBest,
limit: limit,
outputFieldsId: outputFieldsId,
schema: schema,
reduceType: reduceType,
}
}

func (r *defaultLimitReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveResults) (*internalpb.RetrieveResults, error) {
reduceParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(),
r.schema, r.req.GetReq().GetReduceStopForBest())
r.schema, reduce.ToReduceType(r.req.GetReq().GetReduceType()))
return mergeInternalRetrieveResultsAndFillIfEmpty(ctx, results, reduceParam)
}

Expand All @@ -50,7 +51,7 @@ type defaultLimitReducerSegcore struct {
}

func (r *defaultLimitReducerSegcore) Reduce(ctx context.Context, results []*segcorepb.RetrieveResults, segments []Segment, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
mergeParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(), r.schema, r.req.GetReq().GetReduceStopForBest())
mergeParam := NewMergeParam(r.req.GetReq().GetLimit(), r.req.GetReq().GetOutputFieldsId(), r.schema, reduce.ToReduceType(r.req.GetReq().GetReduceType()))
return mergeSegcoreRetrieveResultsAndFillIfEmpty(ctx, results, mergeParam, segments, plan, r.manager)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/querynodev2/segments/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
return ret, nil
}

if param.limit != typeutil.Unlimited && !param.mergeStopForBest {
if param.limit != typeutil.Unlimited && reduce.ShouldUseInputLimit(param.reduceType) {
loopEnd = int(param.limit)
}

Expand All @@ -305,7 +305,7 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd; {
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(param.reduceType) && drainOneResult) {
break
}

Expand Down Expand Up @@ -422,7 +422,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
selected := make([]int, 0, ret.GetAllRetrieveCount())

var limit int = -1
if param.limit != typeutil.Unlimited && !param.mergeStopForBest {
if param.limit != typeutil.Unlimited && reduce.ShouldUseInputLimit(param.reduceType) {
limit = int(param.limit)
}

Expand All @@ -435,7 +435,7 @@ func MergeSegcoreRetrieveResults(ctx context.Context, retrieveResults []*segcore
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
for j := 0; j < loopEnd && (limit == -1 || availableCount < limit); j++ {
sel, drainOneResult := typeutil.SelectMinPKWithTimestamp(validRetrieveResults, cursors)
if sel == -1 || (param.mergeStopForBest && drainOneResult) {
if sel == -1 || (reduce.ShouldStopWhenDrained(param.reduceType) && drainOneResult) {
break
}

Expand Down
Loading

0 comments on commit 37e895b

Please sign in to comment.