Skip to content

Commit

Permalink
enhance: Make import-related error message clearer (milvus-io#28978)
Browse files Browse the repository at this point in the history
issue: milvus-io#28976

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored Dec 8, 2023
1 parent 464bc9e commit 2b05460
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 112 deletions.
4 changes: 2 additions & 2 deletions internal/util/importutil/binlog_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func NewBinlogAdapter(ctx context.Context,
}

// amend the segment size to avoid portential OOM risk
if adapter.blockSize > MaxSegmentSizeInMemory {
adapter.blockSize = MaxSegmentSizeInMemory
if adapter.blockSize > Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() {
adapter.blockSize = Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()
}

return adapter, nil
Expand Down
6 changes: 4 additions & 2 deletions internal/util/importutil/binlog_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

const (
Expand Down Expand Up @@ -69,6 +70,7 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [

func Test_BinlogAdapterNew(t *testing.T) {
ctx := context.Background()
paramtable.Init()

// nil schema
adapter, err := NewBinlogAdapter(ctx, nil, 1024, 2048, nil, nil, 0, math.MaxUint64)
Expand Down Expand Up @@ -103,10 +105,10 @@ func Test_BinlogAdapterNew(t *testing.T) {
assert.NoError(t, err)

// amend blockSize, blockSize should less than MaxSegmentSizeInMemory
adapter, err = NewBinlogAdapter(ctx, collectionInfo, MaxSegmentSizeInMemory+1, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
adapter, err = NewBinlogAdapter(ctx, collectionInfo, Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()+1, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
assert.NotNil(t, adapter)
assert.NoError(t, err)
assert.Equal(t, int64(MaxSegmentSizeInMemory), adapter.blockSize)
assert.Equal(t, Params.DataCoordCfg.SegmentMaxSize.GetAsInt64(), adapter.blockSize)
}

func Test_BinlogAdapterVerify(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutil/binlog_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) erro
}

adapter, err := NewBinlogAdapter(p.ctx, p.collectionInfo, p.blockSize,
MaxTotalSizeInMemory, p.chunkManager, p.callFlushFunc, p.tsStartPoint, p.tsEndPoint)
Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), p.chunkManager, p.callFlushFunc, p.tsStartPoint, p.tsEndPoint)
if err != nil {
log.Warn("Binlog parser: failed to create binlog adapter", zap.Error(err))
return merr.WrapErrImportFailed(fmt.Sprintf("failed to create binlog adapter, error: %v", err))
Expand Down
28 changes: 8 additions & 20 deletions internal/util/importutil/import_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
Expand All @@ -41,20 +41,6 @@ const (
NumpyFileExt = ".npy"
ParquetFileExt = ".parquet"

// parsers read JSON/Numpy/CSV files buffer by buffer, this limitation is to define the buffer size.
ReadBufferSize = 16 * 1024 * 1024 // 16MB

// this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
// flush the segment when its data reach this limitation, let the compaction to compact it later.
MaxSegmentSizeInMemory = 512 * 1024 * 1024 // 512MB

// this limitation is to avoid this OOM risk:
// if the shard number is a large number, although single segment size is small, but there are lot of in-memory segments,
// the total memory size might cause OOM.
// TODO: make it configurable.
MaxTotalSizeInMemory = 6 * 1024 * 1024 * 1024 // 6GB

// progress percent value of persist state
ProgressValueForPersist = 90

Expand All @@ -67,6 +53,8 @@ const (
ProgressPercent = "progress_percent"
)

var Params *paramtable.ComponentParam = paramtable.Get()

// ReportImportAttempts is the maximum # of attempts to retry when import fails.
var ReportImportAttempts uint = 10

Expand Down Expand Up @@ -126,8 +114,8 @@ func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segme
// average binlogSize is expected to be half of the maxBinlogSize
// and avoid binlogSize to be a tiny value
binlogSize := int64(float32(maxBinlogSize) * 0.5)
if binlogSize < ReadBufferSize {
binlogSize = ReadBufferSize
if binlogSize < Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64() {
binlogSize = Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64()
}

wrapper := &ImportWrapper{
Expand Down Expand Up @@ -234,11 +222,11 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
return rowBased, merr.WrapErrImportFailed(fmt.Sprintf("the file '%s' size is zero", filePath))
}

if size > params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() {
if size > Params.CommonCfg.ImportMaxFileSize.GetAsInt64() {
log.Warn("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
zap.Int64("fileSize", size), zap.String("MaxFileSize", params.Params.CommonCfg.ImportMaxFileSize.GetValue()))
zap.Int64("fileSize", size), zap.String("MaxFileSize", Params.CommonCfg.ImportMaxFileSize.GetValue()))
return rowBased, merr.WrapErrImportFailed(fmt.Sprintf("the file '%s' size exceeds the maximum size: %s bytes",
filePath, params.Params.CommonCfg.ImportMaxFileSize.GetValue()))
filePath, Params.CommonCfg.ImportMaxFileSize.GetValue()))
}
totalSize += size
}
Expand Down
41 changes: 20 additions & 21 deletions internal/util/importutil/import_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand Down Expand Up @@ -190,7 +189,7 @@ func Test_ImportWrapperNew(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, nil, 1, ReadBufferSize, nil, cm, nil, nil)
wrapper := NewImportWrapper(ctx, nil, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, cm, nil, nil)
assert.Nil(t, wrapper)

schema := &schemapb.CollectionSchema{
Expand All @@ -210,7 +209,7 @@ func Test_ImportWrapperNew(t *testing.T) {
})
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, cm, nil, nil)
assert.NotNil(t, wrapper)

assignSegFunc := func(shardID int, partID int64) (int64, string, error) {
Expand Down Expand Up @@ -287,7 +286,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err)

t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
Expand All @@ -313,7 +312,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err)

importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
Expand All @@ -325,7 +324,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0)
files = append(files, "/dummy/dummy.json")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, ImportOptions{OnlyValidate: true})
assert.Error(t, err)
})
Expand Down Expand Up @@ -368,7 +367,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files := createSampleNumpyFiles(t, cm)

t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

err = wrapper.Import(files, DefaultImportOptions())
Expand All @@ -386,7 +385,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files[1] = filePath

importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

err = wrapper.Import(files, DefaultImportOptions())
Expand All @@ -397,7 +396,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0)
files = append(files, "/dummy/dummy.npy")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, DefaultImportOptions())
assert.Error(t, err)
})
Expand Down Expand Up @@ -517,7 +516,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

files := make([]string, 0)
Expand Down Expand Up @@ -561,7 +560,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {

collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)

t.Run("unsupported file type", func(t *testing.T) {
files := []string{"uid.txt"}
Expand Down Expand Up @@ -611,16 +610,16 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("empty file list", func(t *testing.T) {
files := []string{}
cm.size = 0
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.NoError(t, err)
assert.False(t, rowBased)
})

t.Run("file size exceed MaxFileSize limit", func(t *testing.T) {
files := []string{"a/1.json"}
cm.size = params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
cm.size = Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.Error(t, err)
assert.True(t, rowBased)
Expand Down Expand Up @@ -691,7 +690,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

files := []string{filePath}
Expand Down Expand Up @@ -738,7 +737,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(createNumpySchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

wrapper.reportImportAttempts = 2
Expand Down Expand Up @@ -773,7 +772,7 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {

collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)

// empty paths
paths := []string{}
Expand Down Expand Up @@ -837,7 +836,7 @@ func Test_ImportWrapperDoBinlogImport(t *testing.T) {

collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
paths := []string{
"/tmp",
"/tmp",
Expand Down Expand Up @@ -900,7 +899,7 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), ReadBufferSize, nil, nil, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)

rowCounter := &rowCounterTest{}
Expand Down Expand Up @@ -943,7 +942,7 @@ func Test_ImportWrapperUpdateProgressPercent(t *testing.T) {

collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, nil, nil, nil)
assert.NotNil(t, wrapper)
assert.Equal(t, int64(0), wrapper.progressPercent)

Expand Down Expand Up @@ -982,7 +981,7 @@ func Test_ImportWrapperFlushFunc(t *testing.T) {
schema := sampleSchema()
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutil/json_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {

// if rows is nil, that means read to end of file, force flush all data
if rows == nil {
err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, MaxTotalSizeInMemory, true)
err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), true)
log.Info("JSON row consumer finished")
return err
}

// rows is not nil, flush in necessary:
// 1. data block size larger than v.blockSize will be flushed
// 2. total data size exceeds MaxTotalSizeInMemory, the largest data block will be flushed
err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, MaxTotalSizeInMemory, false)
err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), false)
if err != nil {
log.Warn("JSON row consumer: try flush data but failed", zap.Error(err))
return merr.WrapErrImportFailed(fmt.Sprintf("try flush data but failed, error: %v", err))
Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutil/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
// for low dimensional vector, the bufSize is a large value, read more rows each time
bufRowCount := parser.bufRowCount
for {
if bufRowCount*sizePerRecord > ReadBufferSize {
if bufRowCount*sizePerRecord > Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt() {
bufRowCount--
} else {
break
Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutil/numpy_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,9 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
// read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB
batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1
if utf {
batchRead += ReadBufferSize / (utf8.UTFMax * maxLen)
batchRead += Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt() / (utf8.UTFMax * maxLen)
} else {
batchRead += ReadBufferSize / maxLen
batchRead += Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt() / maxLen
}

log.Info("Numpy adapter: prepare to read varchar batch by batch",
Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutil/numpy_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,15 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error {
}
tr.Record("splitFieldsData")
// when the estimated size is close to blockSize, save to binlog
err = tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, MaxTotalSizeInMemory, false)
err = tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), false)
if err != nil {
return err
}
tr.Record("tryFlushBlocks")
}

// force flush at the end
return tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, MaxTotalSizeInMemory, true)
return tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), true)
}

// readData method reads numpy data section into a storage.FieldData
Expand Down
Loading

0 comments on commit 2b05460

Please sign in to comment.