Skip to content

Commit

Permalink
enhance: remove cooling off in rate limiter for read requests
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 committed Sep 3, 2024
1 parent 55df253 commit 78089e6
Show file tree
Hide file tree
Showing 14 changed files with 3 additions and 877 deletions.
25 changes: 0 additions & 25 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -977,31 +977,6 @@ quotaAndLimits:
# forceDeny false means dql requests are allowed (except for some
# specific conditions, such as collection has been dropped), true means always reject all dql requests.
forceDeny: false
queueProtection:
enabled: false
# nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
# If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
# until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
# int, default no limit
nqInQueueThreshold: -1
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
# until the latency of queuing no longer exceeds queueLatencyThreshold.
# The latency here refers to the averaged latency over a period of time.
# milliseconds, default no limit
queueLatencyThreshold: -1
resultProtection:
enabled: false
# maxReadResultRate indicated that the system was under backpressure for Search/Query path.
# If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off
# until the read result rate no longer exceeds maxReadResultRate.
# MB/s, default no limit
maxReadResultRate: -1
maxReadResultRatePerDB: -1
maxReadResultRatePerCollection: -1
# colOffSpeed is the speed of search&query rates cool off.
# (0, 1]
coolOffSpeed: 0.9

trace:
# trace exporter type, default is stdout,
Expand Down
14 changes: 0 additions & 14 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2908,9 +2908,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
request.GetCollectionName(),
).Add(float64(request.GetNq()))

subLabel := GetCollectionRateSubLabel(request)
rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()), subLabel)

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.SearchResults{
Status: merr.Status(err),
Expand Down Expand Up @@ -3087,7 +3084,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
}

metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
}
return qt.result, nil
}
Expand Down Expand Up @@ -3118,13 +3114,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
request.GetCollectionName(),
).Add(float64(receiveSize))

subLabel := GetCollectionRateSubLabel(request)
allNQ := int64(0)
for _, searchRequest := range request.Requests {
allNQ += searchRequest.GetNq()
}
rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(allNQ), subLabel)

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.SearchResults{
Status: merr.Status(err),
Expand Down Expand Up @@ -3284,7 +3273,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
}

metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
}
return qt.result, nil
}
Expand Down Expand Up @@ -3593,7 +3581,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
).Inc()

sentSize := proto.Size(qt.result)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))

username := GetCurUserFromContextOrDefault(ctx)
Expand Down Expand Up @@ -6409,5 +6396,4 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
func DeregisterSubLabel(subLabel string) {
rateCol.DeregisterSubLabel(internalpb.RateType_DQLQuery.String(), subLabel)
rateCol.DeregisterSubLabel(internalpb.RateType_DQLSearch.String(), subLabel)
rateCol.DeregisterSubLabel(metricsinfo.ReadResultThroughput, subLabel)
}
2 changes: 0 additions & 2 deletions internal/proxy/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) {
getSubLabelRateMetric(internalpb.RateType_DQLSearch.String())
getRateMetric(internalpb.RateType_DQLQuery.String())
getSubLabelRateMetric(internalpb.RateType_DQLQuery.String())
getRateMetric(metricsinfo.ReadResultThroughput)
getSubLabelRateMetric(metricsinfo.ReadResultThroughput)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func (node *Proxy) initRateCollector() error {
// TODO: add bulkLoad rate
rateCol.Register(internalpb.RateType_DQLSearch.String())
rateCol.Register(internalpb.RateType_DQLQuery.String())
rateCol.Register(metricsinfo.ReadResultThroughput)
return nil
}

Expand Down
25 changes: 0 additions & 25 deletions internal/querynodev2/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,11 @@ var Counter *counter

func RateMetrics() []string {
return []string{
metricsinfo.NQPerSecond,
metricsinfo.SearchThroughput,
metricsinfo.InsertConsumeThroughput,
metricsinfo.DeleteConsumeThroughput,
}
}

func AverageMetrics() []string {
return []string{
metricsinfo.QueryQueueMetric,
metricsinfo.SearchQueueMetric,
}
}

func ConstructLabel(subs ...string) string {
label := ""
for id, sub := range subs {
label += sub
if id != len(subs)-1 {
label += "-"
}
}
return label
}

func init() {
var err error
Rate, err = ratelimitutil.NewRateCollector(ratelimitutil.DefaultWindow, ratelimitutil.DefaultGranularity, false)
Expand All @@ -70,9 +50,4 @@ func init() {
for _, label := range RateMetrics() {
Rate.Register(label)
}
// init average metric

for _, label := range AverageMetrics() {
Average.Register(label)
}
}
47 changes: 0 additions & 47 deletions internal/querynodev2/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package querynodev2
import (
"context"
"fmt"
"time"

"github.com/samber/lo"

Expand Down Expand Up @@ -51,57 +50,13 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) {
return rms, nil
}

func getSearchNQInQueue() (metricsinfo.ReadInfoInQueue, error) {
average, err := collector.Average.Average(metricsinfo.SearchQueueMetric)
if err != nil {
return metricsinfo.ReadInfoInQueue{}, err
}
defer collector.Average.Reset(metricsinfo.SearchQueueMetric)

readyQueueLabel := collector.ConstructLabel(metricsinfo.ReadyQueueType, metricsinfo.SearchQueueMetric)
executeQueueLabel := collector.ConstructLabel(metricsinfo.ExecuteQueueType, metricsinfo.SearchQueueMetric)

return metricsinfo.ReadInfoInQueue{
ReadyQueue: collector.Counter.Get(readyQueueLabel),
ExecuteChan: collector.Counter.Get(executeQueueLabel),
AvgQueueDuration: time.Duration(int64(average)),
}, nil
}

func getQueryTasksInQueue() (metricsinfo.ReadInfoInQueue, error) {
average, err := collector.Average.Average(metricsinfo.QueryQueueMetric)
if err != nil {
return metricsinfo.ReadInfoInQueue{}, err
}
defer collector.Average.Reset(metricsinfo.QueryQueueMetric)

readyQueueLabel := collector.ConstructLabel(metricsinfo.ReadyQueueType, metricsinfo.QueryQueueMetric)
executeQueueLabel := collector.ConstructLabel(metricsinfo.ExecuteQueueType, metricsinfo.QueryQueueMetric)

return metricsinfo.ReadInfoInQueue{
ReadyQueue: collector.Counter.Get(readyQueueLabel),
ExecuteChan: collector.Counter.Get(executeQueueLabel),
AvgQueueDuration: time.Duration(int64(average)),
}, nil
}

// getQuotaMetrics returns QueryNodeQuotaMetrics.
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
rms, err := getRateMetric()
if err != nil {
return nil, err
}

sqms, err := getSearchNQInQueue()
if err != nil {
return nil, err
}

qqms, err := getQueryTasksInQueue()
if err != nil {
return nil, err
}

minTsafeChannel, minTsafe := node.tSafeManager.Min()
collections := node.manager.Collection.List()
nodeID := fmt.Sprint(node.GetNodeID())
Expand Down Expand Up @@ -178,8 +133,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
MinFlowGraphTt: minTsafe,
NumFlowGraph: node.pipelineManager.Num(),
},
SearchQueue: sqms,
QueryQueue: qqms,
GrowingSegmentsSize: totalGrowingSize,
Effect: metricsinfo.NodeEffect{
NodeID: node.GetNodeID(),
Expand Down
5 changes: 0 additions & 5 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
Expand Down Expand Up @@ -805,8 +804,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.ReduceShards, metrics.BatchReduce).
Observe(float64(reduceLatency.Milliseconds()))

collector.Rate.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
collector.Rate.Add(metricsinfo.SearchThroughput, float64(proto.Size(req)))
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.SearchLabel).
Add(float64(proto.Size(req)))

Expand Down Expand Up @@ -953,7 +950,6 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
metrics.QueryLabel, metrics.ReduceShards, metrics.BatchReduce).
Observe(float64(reduceLatency.Milliseconds()))

collector.Rate.Add(metricsinfo.NQPerSecond, 1)
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req)))
relatedDataSize := lo.Reduce(toMergeResults, func(acc int64, result *internalpb.RetrieveResults, _ int) int64 {
return acc + result.GetCostAggregation().GetTotalRelatedDataSize()
Expand Down Expand Up @@ -1016,7 +1012,6 @@ func (node *QueryNode) QueryStream(req *querypb.QueryRequest, srv querypb.QueryN
return nil
}

collector.Rate.Add(metricsinfo.NQPerSecond, 1)
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req)))
return nil
}
Expand Down
4 changes: 0 additions & 4 deletions internal/querynodev2/tasks/query_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -87,8 +85,6 @@ func (t *QueryTask) PreExecute() error {
username).
Observe(inQueueDurationMS)

// Update collector for query node quota.
collector.Average.Add(metricsinfo.QueryQueueMetric, float64(inQueueDuration.Microseconds()))
return nil
}

Expand Down
5 changes: 0 additions & 5 deletions internal/querynodev2/tasks/search_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -120,9 +118,6 @@ func (t *SearchTask) PreExecute() error {
username).
Observe(inQueueDurationMS)

// Update collector for query node quota.
collector.Average.Add(metricsinfo.SearchQueueMetric, float64(inQueueDuration.Microseconds()))

// Execute merged task's PreExecute.
for _, subTask := range t.others {
err := subTask.PreExecute()
Expand Down
Loading

0 comments on commit 78089e6

Please sign in to comment.