Skip to content

Commit

Permalink
Merge branch 'milvus-io:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
congqixia authored Jul 1, 2024
2 parents 29ff1c3 + 6404f1b commit 426bb36
Show file tree
Hide file tree
Showing 36 changed files with 1,306 additions and 202 deletions.
10 changes: 6 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ type Client interface {
// DropDatabase drop database with the given db name.
DropDatabase(ctx context.Context, dbName string, opts ...DropDatabaseOption) error
// AlterDatabase alter database props with given db name.
AlterDatabase(ctx context.Context, collName string, attrs ...entity.DatabaseAttribute) error
AlterDatabase(ctx context.Context, dbName string, attrs ...entity.DatabaseAttribute) error
DescribeDatabase(ctx context.Context, dbName string) (*entity.Database, error)

// -- collection --

// NewCollection intializeds a new collection with pre defined attributes
NewCollection(ctx context.Context, collName string, dimension int64, opts ...CreateCollectionOption) error
// ListCollections list collections from connection
ListCollections(ctx context.Context) ([]*entity.Collection, error)
ListCollections(ctx context.Context, opts ...ListCollectionOption) ([]*entity.Collection, error)
// CreateCollection create collection using provided schema
CreateCollection(ctx context.Context, schema *entity.Schema, shardsNum int32, opts ...CreateCollectionOption) error
// DescribeCollection describe collection meta
Expand Down Expand Up @@ -134,7 +134,7 @@ type Client interface {
Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error)
FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Delete deletes entries match expression
Expand Down Expand Up @@ -214,7 +214,9 @@ type Client interface {
// ListResourceGroups returns list of resource group names in current Milvus instance.
ListResourceGroups(ctx context.Context) ([]string, error)
// CreateResourceGroup creates a resource group with provided name.
CreateResourceGroup(ctx context.Context, rgName string) error
CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error
// UpdateResourceGroups updates resource groups with provided options.
UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error
// DescribeResourceGroup returns resource groups information.
DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error)
// DropResourceGroup drops the resource group with provided name.
Expand Down
13 changes: 12 additions & 1 deletion client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,25 @@ func handleRespStatus(status *commonpb.Status) error {

// ListCollections list collections from connection
// Note that schema info are not provided in collection list
func (c *GrpcClient) ListCollections(ctx context.Context) ([]*entity.Collection, error) {
func (c *GrpcClient) ListCollections(ctx context.Context, opts ...ListCollectionOption) ([]*entity.Collection, error) {
if c.Service == nil {
return []*entity.Collection{}, ErrClientNotReady
}

o := &listCollectionOpt{}
for _, opt := range opts {
opt(o)
}

req := &milvuspb.ShowCollectionsRequest{
DbName: "",
TimeStamp: 0, // means now
}

if o.showInMemory {
req.Type = milvuspb.ShowType_InMemory
}

resp, err := c.Service.ShowCollections(ctx, req)
if err != nil {
return []*entity.Collection{}, err
Expand Down
27 changes: 19 additions & 8 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"

"github.com/milvus-io/milvus-sdk-go/v2/entity"
Expand Down Expand Up @@ -185,18 +186,18 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool, opts ...FlushOption) error {
_, _, _, err := c.FlushV2(ctx, collName, async, opts...)
_, _, _, _, err := c.FlushV2(ctx, collName, async, opts...)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, error) {
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, opts ...FlushOption) ([]int64, []int64, int64, map[string]msgpb.MsgPosition, error) {
if c.Service == nil {
return nil, nil, 0, ErrClientNotReady
return nil, nil, 0, nil, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
req := &milvuspb.FlushRequest{
DbName: "", // reserved,
Expand All @@ -207,11 +208,12 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, nil, 0, err
return nil, nil, 0, nil, err
}
channelCPs := resp.GetChannelCps()
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
ids := segmentIDs.GetData()
Expand All @@ -232,14 +234,23 @@ func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool, o
// respect context deadline/cancel
select {
case <-ctx.Done():
return nil, nil, 0, errors.New("deadline exceeded")
return nil, nil, 0, nil, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
channelCPEntities := make(map[string]msgpb.MsgPosition, len(channelCPs))
for k, v := range channelCPs {
channelCPEntities[k] = msgpb.MsgPosition{
ChannelName: v.GetChannelName(),
MsgID: v.GetMsgID(),
MsgGroup: v.GetMsgGroup(),
Timestamp: v.GetTimestamp(),
}
}
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], channelCPEntities, nil
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
19 changes: 14 additions & 5 deletions client/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type QueryIterator struct {
// init fetches the first batch of data and put it into cache.
// this operation could be used to check all the parameters before returning the iterator.
func (itr *QueryIterator) init(ctx context.Context) error {
if itr.batchSize <= 0 {
return errors.New("batch size cannot less than 1")
}

rs, err := itr.fetchNextBatch(ctx)
if err != nil {
return err
Expand All @@ -116,15 +120,20 @@ func (itr *QueryIterator) composeIteratorExpr() string {
}

expr := strings.TrimSpace(itr.expr)
if expr != "" {
expr += " and "
}

switch itr.pkField.DataType {
case entity.FieldTypeInt64:
expr += fmt.Sprintf("%s > %d", itr.pkField.Name, itr.lastPK)
if len(expr) == 0 {
expr = fmt.Sprintf("%s > %d", itr.pkField.Name, itr.lastPK)
} else {
expr = fmt.Sprintf("(%s) and %s > %d", expr, itr.pkField.Name, itr.lastPK)
}
case entity.FieldTypeVarChar:
expr += fmt.Sprintf(`%s > "%s"`, itr.pkField.Name, itr.lastPK)
if len(expr) == 0 {
expr = fmt.Sprintf(`%s > "%s"`, itr.pkField.Name, itr.lastPK)
} else {
expr = fmt.Sprintf(`(%s) and %s > "%s"`, expr, itr.pkField.Name, itr.lastPK)
}
default:
return itr.expr
}
Expand Down
35 changes: 35 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,18 @@ func GetWithOutputFields(outputFields ...string) GetOption {
}
}

type listCollectionOpt struct {
showInMemory bool
}

type ListCollectionOption func(*listCollectionOpt)

func WithShowInMemory(value bool) ListCollectionOption {
return func(opt *listCollectionOpt) {
opt.showInMemory = value
}
}

type DropCollectionOption func(*milvuspb.DropCollectionRequest)

type ReleaseCollectionOption func(*milvuspb.ReleaseCollectionRequest)
Expand All @@ -310,3 +322,26 @@ type DropPartitionOption func(*milvuspb.DropPartitionRequest)
type LoadPartitionsOption func(*milvuspb.LoadPartitionsRequest)

type ReleasePartitionsOption func(*milvuspb.ReleasePartitionsRequest)

// CreateResourceGroupOption is an option that is used in CreateResourceGroup API.
type CreateResourceGroupOption func(*milvuspb.CreateResourceGroupRequest)

// WithCreateResourceGroupConfig returns a CreateResourceGroupOption that setup the config.
func WithCreateResourceGroupConfig(config *entity.ResourceGroupConfig) CreateResourceGroupOption {
return func(req *milvuspb.CreateResourceGroupRequest) {
req.Config = config
}
}

// UpdateResourceGroupsOption is an option that is used in UpdateResourceGroups API.
type UpdateResourceGroupsOption func(*milvuspb.UpdateResourceGroupsRequest)

// WithUpdateResourceGroupConfig returns an UpdateResourceGroupsOption that sets the new config to the specified resource group.
func WithUpdateResourceGroupConfig(resourceGroupName string, config *entity.ResourceGroupConfig) UpdateResourceGroupsOption {
return func(urgr *milvuspb.UpdateResourceGroupsRequest) {
if urgr.ResourceGroups == nil {
urgr.ResourceGroups = make(map[string]*entity.ResourceGroupConfig)
}
urgr.ResourceGroups[resourceGroupName] = config
}
}
24 changes: 24 additions & 0 deletions client/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,27 @@ func TestMakeSearchQueryOption(t *testing.T) {
assert.Error(t, err)
})
}

func TestWithUpdateResourceGroupConfig(t *testing.T) {
req := &milvuspb.UpdateResourceGroupsRequest{}

WithUpdateResourceGroupConfig("rg1", &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
})(req)
WithUpdateResourceGroupConfig("rg2", &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 2},
})(req)

assert.Equal(t, 2, len(req.ResourceGroups))
assert.Equal(t, int32(1), req.ResourceGroups["rg1"].Requests.NodeNum)
assert.Equal(t, int32(2), req.ResourceGroups["rg2"].Requests.NodeNum)
}

func TestWithCreateResourceGroup(t *testing.T) {
req := &milvuspb.CreateResourceGroupRequest{}

WithCreateResourceGroupConfig(&entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
})(req)
assert.Equal(t, int32(1), req.Config.Requests.NodeNum)
}
25 changes: 24 additions & 1 deletion client/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ func (c *GrpcClient) ListResourceGroups(ctx context.Context) ([]string, error) {
}

// CreateResourceGroup creates a resource group with provided name.
func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) error {
func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string, opts ...CreateResourceGroupOption) error {
if c.Service == nil {
return ErrClientNotReady
}

req := &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgName,
}
for _, opt := range opts {
opt(req)
}

resp, err := c.Service.CreateResourceGroup(ctx, req)
if err != nil {
Expand All @@ -54,6 +57,24 @@ func (c *GrpcClient) CreateResourceGroup(ctx context.Context, rgName string) err
return handleRespStatus(resp)
}

// UpdateResourceGroups updates resource groups with provided options.
func (c *GrpcClient) UpdateResourceGroups(ctx context.Context, opts ...UpdateResourceGroupsOption) error {
if c.Service == nil {
return ErrClientNotReady
}

req := &milvuspb.UpdateResourceGroupsRequest{}
for _, opt := range opts {
opt(req)
}

resp, err := c.Service.UpdateResourceGroups(ctx, req)
if err != nil {
return err
}
return handleRespStatus(resp)
}

// DescribeResourceGroup returns resource groups information.
func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) (*entity.ResourceGroup, error) {
if c.Service == nil {
Expand All @@ -80,6 +101,8 @@ func (c *GrpcClient) DescribeResourceGroup(ctx context.Context, rgName string) (
LoadedReplica: rg.GetNumLoadedReplica(),
OutgoingNodeNum: rg.GetNumOutgoingNode(),
IncomingNodeNum: rg.GetNumIncomingNode(),
Config: rg.GetConfig(),
Nodes: rg.GetNodes(),
}

return result, nil
Expand Down
64 changes: 63 additions & 1 deletion client/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

type ResourceGroupSuite struct {
Expand Down Expand Up @@ -121,6 +122,68 @@ func (s *ResourceGroupSuite) TestCreateResourceGroup() {
})
}

func (s *ResourceGroupSuite) TestUpdateResourceGroups() {
c := s.client
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("normal_run", func() {
defer s.resetMock()
rgName := randStr(10)

s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")).
Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) {
s.Len(req.ResourceGroups, 1)
s.NotNil(req.ResourceGroups[rgName])
s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum)
}).
Return(&commonpb.Status{}, nil)

err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
}))
s.NoError(err)
})

s.Run("request_fails", func() {
defer s.resetMock()

rgName := randStr(10)

s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")).
Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) {
s.Len(req.ResourceGroups, 1)
s.NotNil(req.ResourceGroups[rgName])
s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum)
}).
Return(nil, errors.New("mocked grpc error"))

err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
}))
s.Error(err)
})

s.Run("server_return_err", func() {
defer s.resetMock()

rgName := randStr(10)

s.mock.EXPECT().UpdateResourceGroups(mock.Anything, mock.AnythingOfType("*milvuspb.UpdateResourceGroupsRequest")).
Run(func(_ context.Context, req *milvuspb.UpdateResourceGroupsRequest) {
s.Len(req.ResourceGroups, 1)
s.NotNil(req.ResourceGroups[rgName])
s.Equal(int32(1), req.ResourceGroups[rgName].Requests.NodeNum)
}).
Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil)

err := c.UpdateResourceGroups(ctx, WithUpdateResourceGroupConfig(rgName, &entity.ResourceGroupConfig{
Requests: &entity.ResourceGroupLimit{NodeNum: 1},
}))
s.Error(err)
})
}

func (s *ResourceGroupSuite) TestDescribeResourceGroup() {
c := s.client
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -153,7 +216,6 @@ func (s *ResourceGroupSuite) TestDescribeResourceGroup() {
s.Equal(rgName, req.GetResourceGroup())
}).
Call.Return(func(_ context.Context, req *milvuspb.DescribeResourceGroupRequest) *milvuspb.DescribeResourceGroupResponse {

return &milvuspb.DescribeResourceGroupResponse{
Status: &commonpb.Status{},
ResourceGroup: &milvuspb.ResourceGroup{
Expand Down
4 changes: 3 additions & 1 deletion client/results.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import "github.com/milvus-io/milvus-sdk-go/v2/entity"
import (
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

// SearchResult contains the result from Search api of client
// IDs is the auto generated id values for the entities
Expand Down
Loading

0 comments on commit 426bb36

Please sign in to comment.