Skip to content

Commit

Permalink
Update to support milvus-backup
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Aug 28, 2023
1 parent 9124e54 commit 9a40318
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type Client interface {
Insert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error)
// Flush flush collection, specified
Flush(ctx context.Context, collName string, async bool) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Upsert column-based data of collection, returns id column values
Expand Down
19 changes: 13 additions & 6 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,29 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) error {
_, _, _, err := c.FlushV2(ctx, collName, async)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) {
if c.Service == nil {
return ErrClientNotReady
return nil, nil, 0, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return err
return nil, nil, 0, err
}
req := &milvuspb.FlushRequest{
DbName: "", // reserved,
CollectionNames: []string{collName},
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return err
return nil, nil, 0, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return err
return nil, nil, 0, err
}
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
Expand All @@ -226,14 +233,14 @@ func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) err
// respect context deadline/cancel
select {
case <-ctx.Done():
return errors.New("deadline exceeded")
return nil, nil, 0, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return nil
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
9 changes: 9 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func WithEndTs(endTs int64) BulkInsertOption {
}
}

// IsBackup specifies it is triggered by backup tool
func IsBackup() BulkInsertOption {
return func(req *milvuspb.ImportRequest) {
optionMap := entity.KvPairsMap(req.GetOptions())
optionMap["backup"] = "true"
req.Options = entity.MapKvPairs(optionMap)
}
}

type getOption struct {
partitionNames []string
outputFields []string
Expand Down

0 comments on commit 9a40318

Please sign in to comment.