From 5c3b744b0cadf9802eb9cce8b6a49c42987f5818 Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 13 Jun 2023 10:22:38 +0800 Subject: [PATCH] Fix potential crash bug of bulkinsert (#24763) Signed-off-by: yhmo --- internal/datanode/mock_test.go | 16 ++++++++++++++-- internal/datanode/services.go | 3 +++ internal/datanode/services_test.go | 5 +++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index ba9555c2dfe29..5b84f4f48e7e7 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -230,10 +230,14 @@ type DataCoordFactory struct { AddSegmentError bool AddSegmentNotSuccess bool + AddSegmentEmpty bool } func (ds *DataCoordFactory) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { - return &datapb.AssignSegmentIDResponse{ + if ds.AddSegmentError { + return nil, errors.New("Error") + } + res := &datapb.AssignSegmentIDResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, @@ -242,7 +246,15 @@ func (ds *DataCoordFactory) AssignSegmentID(ctx context.Context, req *datapb.Ass SegID: 666, }, }, - }, nil + } + if ds.AddSegmentNotSuccess { + res.Status = &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + } + } else if ds.AddSegmentEmpty { + res.SegIDAssignments = []*datapb.SegmentIDAssignment{} + } + return res, nil } func (ds *DataCoordFactory) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 2ac6b07b81832..9ba3efc3d8e0f 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -642,6 +642,9 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil if resp.Status.ErrorCode != commonpb.ErrorCode_Success { return 0, "", fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason) } + if len(resp.SegIDAssignments) == 0 || resp.SegIDAssignments[0] == nil { + return 0, "", fmt.Errorf("syncSegmentID Failed: the collection was dropped") + } segmentID := resp.SegIDAssignments[0].SegID log.Info("new segment assigned", zap.Int64("task ID", importTaskID), diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index a476de694d269..9d709c40ac44b 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -452,6 +452,11 @@ func (s *DataNodeServicesSuite) TestImport() { s.Assert().NoError(err) s.node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = false + s.node.dataCoord.(*DataCoordFactory).AddSegmentEmpty = true + _, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) + s.Assert().NoError(err) + s.node.dataCoord.(*DataCoordFactory).AddSegmentEmpty = false + stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) s.Assert().NoError(err) s.Assert().True(merr.Ok(stat))