Skip to content

Commit

Permalink
enhance: move the search utilities from querynode into new module (mi…
Browse files Browse the repository at this point in the history
…lvus-io#37531)

issue: milvus-io#33285

- The search utilities will be shared between query node and streaming
node.

Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh authored Nov 11, 2024
1 parent 5e90f34 commit fca946d
Show file tree
Hide file tree
Showing 19 changed files with 26 additions and 22 deletions.
2 changes: 1 addition & 1 deletion internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/reduce"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down
8 changes: 4 additions & 4 deletions internal/querynodev2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ import (
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/pipeline"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/registry"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -113,7 +113,7 @@ type QueryNode struct {
loader segments.Loader

// Search/Query
scheduler tasks.Scheduler
scheduler scheduler.Scheduler

// etcd client
etcdCli *clientv3.Client
Expand Down Expand Up @@ -339,7 +339,7 @@ func (node *QueryNode) Init() error {
}

schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()
node.scheduler = tasks.NewScheduler(
node.scheduler = scheduler.NewScheduler(
schedulePolicy,
)

Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -689,7 +690,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
return resp, nil
}

var task tasks.Task
var task scheduler.Task
if paramtable.Get().QueryNodeCfg.UseStreamComputing.GetAsBool() {
task = tasks.NewStreamingSearchTask(searchCtx, collection, node.manager, req, node.serverID)
} else {
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/tasks/query_stream_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/streamrpc"
)

var _ Task = &QueryStreamTask{}
var _ scheduler.Task = &QueryStreamTask{}

func NewQueryStreamTask(ctx context.Context,
collection *segments.Collection,
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/tasks/query_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var _ Task = &QueryTask{}
var _ scheduler.Task = &QueryTask{}

func NewQueryTask(ctx context.Context,
collection *segments.Collection,
Expand Down
9 changes: 5 additions & 4 deletions internal/querynodev2/tasks/search_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand All @@ -30,8 +31,8 @@ import (
)

var (
_ Task = &SearchTask{}
_ MergeTask = &SearchTask{}
_ scheduler.Task = &SearchTask{}
_ scheduler.MergeTask = &SearchTask{}
)

type SearchTask struct {
Expand Down Expand Up @@ -346,7 +347,7 @@ func (t *SearchTask) NQ() int64 {
return t.nq
}

func (t *SearchTask) MergeWith(other Task) bool {
func (t *SearchTask) MergeWith(other scheduler.Task) bool {
switch other := other.(type) {
case *SearchTask:
return t.Merge(other)
Expand Down Expand Up @@ -416,7 +417,7 @@ func NewStreamingSearchTask(ctx context.Context,
}
}

func (t *StreamingSearchTask) MergeWith(other Task) bool {
func (t *StreamingSearchTask) MergeWith(other scheduler.Task) bool {
return false
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"container/ring"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import "github.com/milvus-io/milvus/internal/proto/internalpb"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tasks
package scheduler

import (
"fmt"
Expand Down

0 comments on commit fca946d

Please sign in to comment.