Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Check load fields for previous loaded collection #35905

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package job
import (
"context"
"fmt"
"reflect"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -104,6 +105,14 @@ func (job *LoadCollectionJob) PreExecute() error {
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection")
}

if !reflect.DeepEqual(collection.GetLoadFields(), req.GetLoadFields()) {
log.Warn("collection with different load field list exists, release this collection first before chaning its replica number",
zap.Int64s("loadedFieldIDs", collection.GetLoadFields()),
zap.Int64s("reqFieldIDs", req.GetLoadFields()),
)
return merr.WrapErrParameterInvalid(collection.GetLoadFields(), req.GetLoadFields(), "can't change the load field list for loaded collection")
}

return nil
}

Expand Down Expand Up @@ -289,6 +298,14 @@ func (job *LoadPartitionJob) PreExecute() error {
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions")
}

if !reflect.DeepEqual(collection.GetLoadFields(), req.GetLoadFields()) {
log.Warn("collection with different load field list exists, release this collection first before chaning its replica number",
zap.Int64s("loadedFieldIDs", collection.GetLoadFields()),
zap.Int64s("reqFieldIDs", req.GetLoadFields()),
)
return merr.WrapErrParameterInvalid(collection.GetLoadFields(), req.GetLoadFields(), "can't change the load field list for loaded collection")
}

return nil
}

Expand Down
54 changes: 54 additions & 0 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,32 @@ func (suite *JobSuite) TestLoadCollection() {
suite.ErrorIs(err, merr.ErrParameterInvalid)
}

// Test load existed collection with different load fields
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
LoadFields: []int64{100, 101},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}

// Test load partition while collection exists
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
Expand Down Expand Up @@ -514,6 +540,34 @@ func (suite *JobSuite) TestLoadPartition() {
suite.ErrorIs(err, merr.ErrParameterInvalid)
}

// Test load partition with different load fields
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}

req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
LoadFields: []int64{100, 101},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}

// Test load partition with more partition
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
Expand Down
Loading