Skip to content

Commit

Permalink
fix: [2.4] Fill load field list from old version load info (#35993) (#…
Browse files Browse the repository at this point in the history
…36018)

Cherry-pick from master
pr: #35993
See also #35959

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Sep 6, 2024
1 parent 55b33cd commit e21b09c
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 0 deletions.
43 changes: 43 additions & 0 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -169,6 +172,16 @@ func (m *CollectionManager) Recover(broker Broker) error {
continue
}

err := m.upgradeLoadFields(collection, broker)
if err != nil {
if errors.Is(err, merr.ErrCollectionNotFound) {
log.Warn("collection not found, skip upgrade logic and wait for release")
} else {
log.Warn("upgrade load field failed", zap.Error(err))
return err
}
}

m.collections[collection.CollectionID] = &Collection{
CollectionLoadInfo: collection,
}
Expand Down Expand Up @@ -207,6 +220,36 @@ func (m *CollectionManager) Recover(broker Broker) error {
return nil
}

func (m *CollectionManager) upgradeLoadFields(collection *querypb.CollectionLoadInfo, broker Broker) error {
// only fill load fields when value is nil
if collection.LoadFields != nil {
return nil
}

// invoke describe collection to get collection schema
resp, err := broker.DescribeCollection(context.Background(), collection.CollectionID)
if err := merr.CheckRPCCall(resp, err); err != nil {
return err
}

// fill all field id as legacy default behavior
collection.LoadFields = lo.FilterMap(resp.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) (int64, bool) {
// load fields list excludes system fields
return fieldSchema.GetFieldID(), !common.IsSystemField(fieldSchema.GetFieldID())
})

// put updated meta back to store
err = m.putCollection(true, &Collection{
CollectionLoadInfo: collection,
LoadPercentage: 100,
})
if err != nil {
return err
}

return nil
}

// upgradeRecover recovers from old version <= 2.2.x for compatibility.
func (m *CollectionManager) upgradeRecover(broker Broker) error {
// for loaded collection from 2.2, it only save a old version CollectionLoadInfo without LoadType.
Expand Down
141 changes: 141 additions & 0 deletions internal/querycoordv2/meta/collection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ import (
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -494,6 +498,17 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() {
if suite.loadTypes[i] == querypb.LoadType_LoadCollection {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
}
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField},
{FieldID: common.TimeStampField},
{FieldID: 100, Name: "pk"},
{FieldID: 101, Name: "vector"},
},
},
}, nil).Maybe()
}

// do recovery
Expand All @@ -508,6 +523,131 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() {
}
}

func (suite *CollectionManagerSuite) TestUpgradeLoadFields() {
suite.releaseAll()
mgr := suite.mgr

// put old version of collections and partitions
for i, collection := range suite.collections {
mgr.PutCollection(&Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: collection,
ReplicaNumber: suite.replicaNumber[i],
Status: querypb.LoadStatus_Loaded,
LoadType: suite.loadTypes[i],
LoadFields: nil, // use nil Load fields, mocking old load info
},
LoadPercentage: 100,
CreatedAt: time.Now(),
})
for j, partition := range suite.partitions[collection] {
mgr.PutPartition(&Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: collection,
PartitionID: partition,
Status: querypb.LoadStatus_Loaded,
},
LoadPercentage: suite.parLoadPercent[collection][j],
CreatedAt: time.Now(),
})
}
}

// set expectations
for _, collection := range suite.collections {
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField},
{FieldID: common.TimeStampField},
{FieldID: 100, Name: "pk"},
{FieldID: 101, Name: "vector"},
},
},
}, nil)
}

// do recovery
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
suite.checkLoadResult()

for _, collection := range suite.collections {
newColl := mgr.GetCollection(collection)
suite.ElementsMatch([]int64{100, 101}, newColl.GetLoadFields())
}
}

func (suite *CollectionManagerSuite) TestUpgradeLoadFieldsFail() {
suite.Run("normal_error", func() {
suite.releaseAll()
mgr := suite.mgr

mgr.PutCollection(&Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
ReplicaNumber: 1,
Status: querypb.LoadStatus_Loaded,
LoadType: querypb.LoadType_LoadCollection,
LoadFields: nil, // use nil Load fields, mocking old load info
},
LoadPercentage: 100,
CreatedAt: time.Now(),
})
mgr.PutPartition(&Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: 100,
PartitionID: 1000,
Status: querypb.LoadStatus_Loaded,
},
LoadPercentage: 100,
CreatedAt: time.Now(),
})

suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(100)).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
// do recovery
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.Error(err)
})

suite.Run("normal_error", func() {
suite.releaseAll()
mgr := suite.mgr

mgr.PutCollection(&Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
ReplicaNumber: 1,
Status: querypb.LoadStatus_Loaded,
LoadType: querypb.LoadType_LoadCollection,
LoadFields: nil, // use nil Load fields, mocking old load info
},
LoadPercentage: 100,
CreatedAt: time.Now(),
})
mgr.PutPartition(&Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: 100,
PartitionID: 1000,
Status: querypb.LoadStatus_Loaded,
},
LoadPercentage: 100,
CreatedAt: time.Now(),
})

suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(100)).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(merr.WrapErrCollectionNotFound(100)),
}, nil).Once()
// do recovery
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
})
}

func (suite *CollectionManagerSuite) loadAll() {
mgr := suite.mgr

Expand All @@ -523,6 +663,7 @@ func (suite *CollectionManagerSuite) loadAll() {
ReplicaNumber: suite.replicaNumber[i],
Status: status,
LoadType: suite.loadTypes[i],
LoadFields: []int64{100, 101},
},
LoadPercentage: suite.colLoadPercent[i],
CreatedAt: time.Now(),
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (suite *ServerSuite) loadAll() {
CollectionID: collection,
ReplicaNumber: suite.replicaNumber[collection],
ResourceGroups: []string{meta.DefaultResourceGroupName},
LoadFields: []int64{100, 101},
}
resp, err := suite.server.LoadCollection(ctx, req)
suite.NoError(err)
Expand All @@ -449,6 +450,7 @@ func (suite *ServerSuite) loadAll() {
PartitionIDs: suite.partitions[collection],
ReplicaNumber: suite.replicaNumber[collection],
ResourceGroups: []string{meta.DefaultResourceGroupName},
LoadFields: []int64{100, 101},
}
resp, err := suite.server.LoadPartitions(ctx, req)
suite.NoError(err)
Expand Down

0 comments on commit e21b09c

Please sign in to comment.