From 9a403181f65928950210fec261668e6315e00d59 Mon Sep 17 00:00:00 2001 From: wayblink Date: Thu, 25 May 2023 11:47:16 +0800 Subject: [PATCH] Update to support milvus-backup Signed-off-by: wayblink --- client/client.go | 3 +++ client/insert.go | 19 +++++++++++++------ client/options.go | 9 +++++++++ 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index eb1403b5..5f3cd730 100644 --- a/client/client.go +++ b/client/client.go @@ -125,6 +125,9 @@ type Client interface { Insert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error) // Flush flush collection, specified Flush(ctx context.Context, collName string, async bool) error + // FlushV2 flush collection, specified, return newly sealed segmentIds, all flushed segmentIds of the collection, seal time and error + // currently it is only used in milvus-backup(https://github.com/zilliztech/milvus-backup) + FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) // DeleteByPks deletes entries related to provided primary keys DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error // Upsert column-based data of collection, returns id column values diff --git a/client/insert.go b/client/insert.go index 2a1f20e6..24fb6828 100644 --- a/client/insert.go +++ b/client/insert.go @@ -191,11 +191,18 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column // Flush force collection to flush memory records into storage // in sync mode, flush will wait all segments to be flushed func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) error { + _, _, _, err := c.FlushV2(ctx, collName, async) + return err +} + +// Flush force collection to flush memory records into storage +// in sync mode, flush will wait all segments to be flushed +func (c *GrpcClient) FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) { if c.Service == nil { - return ErrClientNotReady + return nil, nil, 0, ErrClientNotReady } if err := c.checkCollectionExists(ctx, collName); err != nil { - return err + return nil, nil, 0, err } req := &milvuspb.FlushRequest{ DbName: "", // reserved, @@ -203,10 +210,10 @@ func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) err } resp, err := c.Service.Flush(ctx, req) if err != nil { - return err + return nil, nil, 0, err } if err := handleRespStatus(resp.GetStatus()); err != nil { - return err + return nil, nil, 0, err } if !async { segmentIDs, has := resp.GetCollSegIDs()[collName] @@ -226,14 +233,14 @@ func (c *GrpcClient) Flush(ctx context.Context, collName string, async bool) err // respect context deadline/cancel select { case <-ctx.Done(): - return errors.New("deadline exceeded") + return nil, nil, 0, errors.New("deadline exceeded") default: } time.Sleep(200 * time.Millisecond) } } } - return nil + return resp.GetCollSegIDs()[collName].GetData(), resp.GetFlushCollSegIDs()[collName].GetData(), resp.GetCollSealTimes()[collName], nil } // DeleteByPks deletes entries related to provided primary keys diff --git a/client/options.go b/client/options.go index f4e5b9b0..14875c3e 100644 --- a/client/options.go +++ b/client/options.go @@ -235,6 +235,15 @@ func WithEndTs(endTs int64) BulkInsertOption { } } +// IsBackup specifies it is triggered by backup tool +func IsBackup() BulkInsertOption { + return func(req *milvuspb.ImportRequest) { + optionMap := entity.KvPairsMap(req.GetOptions()) + optionMap["backup"] = "true" + req.Options = entity.MapKvPairs(optionMap) + } +} + type getOption struct { partitionNames []string outputFields []string