Skip to content

Commit

Permalink
fix: Fix update loading collection's load config doesn't work
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Dec 19, 2024
1 parent 451deb3 commit 59a23ec
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 2 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection

var loadJob job.Job
collection := s.meta.GetCollection(req.GetCollectionID())
if collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded {
if collection != nil {
// if collection is loaded, check if collection is loaded with the same replica number and resource groups
// if replica number or resource group changes, switch to update load config
collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(collection.GetCollectionID()).Collect()
Expand Down Expand Up @@ -1240,7 +1240,7 @@ func (s *Server) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadCo
jobs := make([]job.Job, 0, len(req.GetCollectionIDs()))
for _, collectionID := range req.GetCollectionIDs() {
collection := s.meta.GetCollection(collectionID)
if collection == nil || collection.GetStatus() != querypb.LoadStatus_Loaded {
if collection == nil{
err := merr.WrapErrCollectionNotLoaded(collectionID)
log.Warn("failed to update load config", zap.Error(err))
continue
Expand Down
120 changes: 120 additions & 0 deletions tests/integration/replicas/load/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,126 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() {
s.releaseCollection(dbName, collectionName)
}

func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_OnLoadingCollection() {
ctx := context.Background()
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 1,
RowNumPerSegment: 2000,
})

// prepare resource groups
rgNum := 10
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},

TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}

resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)

for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}

nodesInRG := make(map[string][]int64)
s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
nodesInRG[rg] = []int64{resp1.ResourceGroup.Nodes[0].NodeId}
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)

// trigger collection loading, and modify collection's load config during loading
s.loadCollection(collectionName, dbName, 3, rgs[:3])
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
ReplicaNumber: 1,
ResourceGroups: rgs[:1],
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.loadCollection(collectionName, dbName, 3, rgs[:3])
loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
ReplicaNumber: 3,
ResourceGroups: rgs[1:4],
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.loadCollection(collectionName, dbName, 3, rgs[:3])
loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
ReplicaNumber: 5,
ResourceGroups: rgs[4:9],
})
s.NoError(err)
s.True(merr.Ok(loadStatus))

s.Eventually(func() bool {
resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp3.Status))
return len(resp3.GetReplicas()) == 5
}, 30*time.Second, 1*time.Second)

s.Eventually(func() bool {
segmentNum, channelNum := 0, 0
for _, qn := range s.Cluster.GetAllQueryNodes() {
resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.Status))
segmentNum += len(resp.Segments)
channelNum += len(resp.Channels)
}
return segmentNum == 5 && channelNum == 5
}, 30*time.Second, 1*time.Second)

s.releaseCollection(dbName, collectionName)
}

func TestReplicas(t *testing.T) {
suite.Run(t, new(LoadTestSuite))
}

0 comments on commit 59a23ec

Please sign in to comment.