Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to support milvus-backup #567

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type Client interface {
Insert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error)
// Flush flush collection, specified
Flush(ctx context.Context, collName string, async bool) error
// FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error
// currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup)
FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error)
// DeleteByPks deletes entries related to provided primary keys
DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error
// Upsert column-based data of collection, returns id column values
Expand Down
19 changes: 13 additions & 6 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,29 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) error {
_, _, _, err := c.FlushV2(ctx, collName, async)
return err
}

// Flush force collection to flush memory records into storage
// in sync mode, flush will wait all segments to be flushed
func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) {
if c.Service == nil {
return ErrClientNotReady
return nil, nil, 0, ErrClientNotReady
}
if err := c.checkCollectionExists(ctx, collName); err != nil {
return err
return nil, nil, 0, err
}
req := &milvuspb.FlushRequest{
DbName: "", // reserved,
CollectionNames: []string{collName},
}
resp, err := c.Service.Flush(ctx, req)
if err != nil {
return err
return nil, nil, 0, err
}
if err := handleRespStatus(resp.GetStatus()); err != nil {
return err
return nil, nil, 0, err
}
if !async {
segmentIDs, has := resp.GetCollSegIDs()[collName]
Expand All @@ -226,14 +233,14 @@ func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) err
// respect context deadline/cancel
select {
case <-ctx.Done():
return errors.New("deadline exceeded")
return nil, nil, 0, errors.New("deadline exceeded")
default:
}
time.Sleep(200 * time.Millisecond)
}
}
}
return nil
return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil
}

// DeleteByPks deletes entries related to provided primary keys
Expand Down
9 changes: 9 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func WithEndTs(endTs int64) BulkInsertOption {
}
}

// IsBackup specifies it is triggered by backup tool
func IsBackup() BulkInsertOption {
return func(req *milvuspb.ImportRequest) {
optionMap := entity.KvPairsMap(req.GetOptions())
optionMap["backup"] = "true"
req.Options = entity.MapKvPairs(optionMap)
}
}

type getOption struct {
partitionNames []string
outputFields []string
Expand Down
Loading