From ab17614a0b2c19ee46ad2cc4e78056326110f59a Mon Sep 17 00:00:00 2001 From: groot Date: Mon, 22 May 2023 18:53:26 +0800 Subject: [PATCH] Support dynamic field for bulkinsert (#24265) Signed-off-by: yhmo --- internal/util/importutil/import_util.go | 71 ++++++- internal/util/importutil/import_util_test.go | 95 +++++++++ .../util/importutil/import_wrapper_test.go | 4 +- internal/util/importutil/json_parser.go | 78 +++++++- internal/util/importutil/json_parser_test.go | 186 ++++++++++++++++++ internal/util/importutil/numpy_parser.go | 8 + 6 files changed, 427 insertions(+), 15 deletions(-) diff --git a/internal/util/importutil/import_util.go b/internal/util/importutil/import_util.go index a0370e9c8508e..d23ace8f17153 100644 --- a/internal/util/importutil/import_util.go +++ b/internal/util/importutil/import_util.go @@ -309,6 +309,8 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[ } case schemapb.DataType_JSON: validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { + // for JSON data, we accept two kinds input: string and map[string]interface + // user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}} if value, ok := obj.(string); ok { var dummy interface{} err := json.Unmarshal([]byte(value), &dummy) @@ -316,6 +318,12 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[ return fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w", value, schema.GetName(), err) } field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value)) + } else if mp, ok := obj.(map[string]interface{}); ok { + bs, err := json.Marshal(mp) + if err != nil { + return fmt.Errorf("failed to parse value for JSON field '%s', error: %w", schema.GetName(), err) + } + field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, bs) } else { return fmt.Errorf("illegal value '%v' for JSON type field '%s'", obj, schema.GetName()) } @@ -372,6 +380,48 @@ func triggerGC() { debug.FreeOSMemory() } +// if user didn't provide dynamic data, fill the dynamic field by "{}" +func fillDynamicData(blockData map[storage.FieldID]storage.FieldData, collectionSchema *schemapb.CollectionSchema) error { + if !collectionSchema.GetEnableDynamicField() { + return nil + } + + dynamicFieldID := int64(-1) + for i := 0; i < len(collectionSchema.Fields); i++ { + schema := collectionSchema.Fields[i] + if schema.GetIsDynamic() { + dynamicFieldID = schema.GetFieldID() + break + } + } + + if dynamicFieldID < 0 { + return fmt.Errorf("the collection schema is dynamic but dynamic field is not found") + } + + rowCount := 0 + if len(blockData) > 0 { + for _, v := range blockData { + rowCount = v.RowNum() + } + } + + _, ok := blockData[dynamicFieldID] + if !ok { + data := &storage.JSONFieldData{ + Data: make([][]byte, 0), + } + + bs := []byte("{}") + for i := 0; i < rowCount; i++ { + data.Data = append(data.Data, bs) + } + blockData[dynamicFieldID] = data + } + + return nil +} + // tryFlushBlocks does the two things: // 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file // 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block @@ -407,7 +457,12 @@ func tryFlushBlocks(ctx context.Context, // force to flush, called at the end of Read() if force && rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil) - err := callFlushFunc(blockData, i) + err := fillDynamicData(blockData, collectionSchema) + if err != nil { + log.Error("Import util: failed to fill dynamic field", zap.Error(err)) + return fmt.Errorf("failed to fill dynamic field, error: %w", err) + } + err = callFlushFunc(blockData, i) if err != nil { log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err)) return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err) @@ -426,7 +481,12 @@ func tryFlushBlocks(ctx context.Context, // initialize a new FieldData list for next round batch read if size > int(blockSize) && rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil) - err := callFlushFunc(blockData, i) + err := fillDynamicData(blockData, collectionSchema) + if err != nil { + log.Error("Import util: failed to fill dynamic field", zap.Error(err)) + return fmt.Errorf("failed to fill dynamic field, error: %w", err) + } + err = callFlushFunc(blockData, i) if err != nil { log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err)) return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err) @@ -470,7 +530,12 @@ func tryFlushBlocks(ctx context.Context, if rowCount > 0 { printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil) - err := callFlushFunc(blockData, biggestItem) + err := fillDynamicData(blockData, collectionSchema) + if err != nil { + log.Error("Import util: failed to fill dynamic field", zap.Error(err)) + return fmt.Errorf("failed to fill dynamic field, error: %w", err) + } + err = callFlushFunc(blockData, biggestItem) if err != nil { log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem)) return fmt.Errorf("failed to flush biggest block data for shard id %d, error: %w", biggestItem, err) diff --git a/internal/util/importutil/import_util_test.go b/internal/util/importutil/import_util_test.go index 05dbd8a5e4ab6..7f3c00a66154f 100644 --- a/internal/util/importutil/import_util_test.go +++ b/internal/util/importutil/import_util_test.go @@ -393,6 +393,10 @@ func Test_InitValidators(t *testing.T) { validVal = "aa" checkConvertFunc("FieldString", validVal, nil) + validVal = map[string]interface{}{"x": 5, "y": true, "z": "hello"} + checkConvertFunc("FieldJSON", validVal, nil) + checkConvertFunc("FieldJSON", "{\"x\": 8}", "{") + // the binary vector dimension is 16, shoud input two uint8 values, each value should between 0~255 validVal = []interface{}{jsonNumber("100"), jsonNumber("101")} invalidVal = []interface{}{jsonNumber("100"), jsonNumber("1256")} @@ -540,6 +544,97 @@ func Test_GetFieldDimension(t *testing.T) { assert.Equal(t, 0, dim) } +func Test_FillDynamicData(t *testing.T) { + ctx := context.Background() + + schema := &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 106, + Name: "FieldID", + IsPrimaryKey: true, + AutoID: false, + Description: "int64", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 113, + Name: "FieldDynamic", + IsPrimaryKey: false, + IsDynamic: true, + Description: "dynamic field", + DataType: schemapb.DataType_JSON, + }, + }, + } + + flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error { + return nil + } + + rowCount := 1000 + idData := &storage.Int64FieldData{ + Data: make([]int64, 0), + } + for i := 0; i < rowCount; i++ { + idData.Data = append(idData.Data, int64(i)) // this is primary key + } + + t.Run("dynamic field is filled", func(t *testing.T) { + blockData := map[storage.FieldID]storage.FieldData{ + 106: idData, + } + + segmentsData := []map[storage.FieldID]storage.FieldData{ + blockData, + } + + err := fillDynamicData(blockData, schema) + assert.NoError(t, err) + assert.Equal(t, 2, len(blockData)) + assert.Contains(t, blockData, int64(113)) + assert.Equal(t, rowCount, blockData[113].RowNum()) + assert.Equal(t, []byte("{}"), blockData[113].GetRow(0).([]byte)) + + err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1, 1, false) + assert.NoError(t, err) + }) + + t.Run("collection is dynamic by no dynamic field", func(t *testing.T) { + blockData := map[storage.FieldID]storage.FieldData{ + 106: idData, + } + schema.Fields[1].IsDynamic = false + err := fillDynamicData(blockData, schema) + assert.Error(t, err) + + segmentsData := []map[storage.FieldID]storage.FieldData{ + blockData, + } + + err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024*1024, 1, true) + assert.Error(t, err) + + err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024, 1, false) + assert.Error(t, err) + + err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024*1024, 1, false) + assert.Error(t, err) + }) + + t.Run("collection is not dynamic", func(t *testing.T) { + blockData := map[storage.FieldID]storage.FieldData{ + 106: idData, + } + schema.EnableDynamicField = false + err := fillDynamicData(blockData, schema) + assert.NoError(t, err) + }) +} + func Test_TryFlushBlocks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 485c080b44c3d..dc4f3359f9067 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -242,9 +242,9 @@ func Test_ImportWrapperRowBased(t *testing.T) { content := []byte(`{ "rows":[ - {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": {"x": 2}, "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{\"k\": 2.5}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, - {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"y\": \"hello\"}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, + {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": {"y": "hello"}, "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} ] diff --git a/internal/util/importutil/json_parser.go b/internal/util/importutil/json_parser.go index 4b67e11c1882c..33c4efc57b9ab 100644 --- a/internal/util/importutil/json_parser.go +++ b/internal/util/importutil/json_parser.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" "go.uber.org/zap" + "golang.org/x/exp/maps" ) const ( @@ -44,17 +45,17 @@ type IOReader struct { } type JSONParser struct { - ctx context.Context // for canceling parse process - bufRowCount int // max rows in a buffer - fields map[string]int64 // fields need to be parsed + ctx context.Context // for canceling parse process + bufRowCount int // max rows in a buffer name2FieldID map[string]storage.FieldID updateProgressFunc func(percent int64) // update working progress percent value + dynamicFieldID storage.FieldID // dynamic field id, set to -1 if no dynamic field } // NewJSONParser helper function to create a JSONParser func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema, updateProgressFunc func(percent int64)) *JSONParser { - fields := make(map[string]int64) name2FieldID := make(map[string]storage.FieldID) + dynamicFieldID := int64(-1) for i := 0; i < len(collectionSchema.Fields); i++ { schema := collectionSchema.Fields[i] // RowIDField and TimeStampField is internal field, no need to parse @@ -66,16 +67,18 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch continue } - fields[schema.GetName()] = 0 name2FieldID[schema.GetName()] = schema.GetFieldID() + if schema.GetIsDynamic() && collectionSchema.GetEnableDynamicField() { + dynamicFieldID = schema.GetFieldID() + } } parser := &JSONParser{ ctx: ctx, bufRowCount: 1024, - fields: fields, name2FieldID: name2FieldID, updateProgressFunc: updateProgressFunc, + dynamicFieldID: dynamicFieldID, } adjustBufSize(parser, collectionSchema) @@ -108,6 +111,50 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche parser.bufRowCount = bufRowCount } +func (p *JSONParser) combineDynamicRow(dynamicValues map[string]interface{}, row map[storage.FieldID]interface{}) error { + if p.dynamicFieldID < 0 { + return nil + } + // combine the dynamic field value + // valid input: + // case 1: {"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"} + // case 2: {"id": 1, "vector": [], "x": 8, "$meta": {}} + // case 3: {"id": 1, "vector": [], "$meta": "{\"x\": 8}"} + // case 4: {"id": 1, "vector": [], "$meta": {"x": 8}} + // case 5: {"id": 1, "vector": [], "$meta": {}} + // case 6: {"id": 1, "vector": [], "x": 8} + // case 7: {"id": 1, "vector": []} + obj, ok := row[p.dynamicFieldID] + if ok { + if len(dynamicValues) > 0 { + if value, is := obj.(string); is { + // case 1 + mp := make(map[string]interface{}) + json.Unmarshal([]byte(value), &mp) + maps.Copy(dynamicValues, mp) + } else if mp, is := obj.(map[string]interface{}); is { + // case 2 + maps.Copy(dynamicValues, mp) + } else { + // invalid input + return errors.New("illegal value for dynamic field") + } + row[p.dynamicFieldID] = dynamicValues + } + // else case 3/4/5 + } else { + if len(dynamicValues) > 0 { + // case 6 + row[p.dynamicFieldID] = dynamicValues + } else { + // case 7 + row[p.dynamicFieldID] = "{}" + } + } + + return nil +} + func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}, error) { stringMap, ok := raw.(map[string]interface{}) if !ok { @@ -115,20 +162,29 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{} return nil, errors.New("invalid JSON format, each row should be a key-value map") } + dynamicValues := make(map[string]interface{}) row := make(map[storage.FieldID]interface{}) for k, v := range stringMap { - // if user provided redundant field, return error fieldID, ok := p.name2FieldID[k] - if !ok { + if ok { + row[fieldID] = v + } else if p.dynamicFieldID >= 0 { + // has dynamic field. put redundant pair to dynamicValues + dynamicValues[k] = v + } else { + // no dynamic field. if user provided redundant field, return error log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k)) return nil, fmt.Errorf("the field '%s' is not defined in collection schema", k) } - row[fieldID] = v } // some fields not provided? if len(row) != len(p.name2FieldID) { for k, v := range p.name2FieldID { + if v == p.dynamicFieldID { + // dyanmic field, allow user ignore this field + continue + } _, ok := row[v] if !ok { log.Error("JSON parser: a field value is missed", zap.String("fieldName", k)) @@ -137,7 +193,9 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{} } } - return row, nil + // combine the redundant pairs into dunamic field(if has) + err := p.combineDynamicRow(dynamicValues, row) + return row, err } func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error { diff --git a/internal/util/importutil/json_parser_test.go b/internal/util/importutil/json_parser_test.go index 7940989b97c3e..a56150576ad1f 100644 --- a/internal/util/importutil/json_parser_test.go +++ b/internal/util/importutil/json_parser_test.go @@ -385,3 +385,189 @@ func Test_JSONParserParseRows_StrPK(t *testing.T) { } } } + +func Test_JSONParserCombineDynamicRow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + schema := &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 106, + Name: "FieldID", + IsPrimaryKey: true, + AutoID: false, + Description: "int64", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 113, + Name: "FieldDynamic", + IsPrimaryKey: false, + IsDynamic: true, + Description: "dynamic field", + DataType: schemapb.DataType_JSON, + }, + }, + } + parser := NewJSONParser(ctx, schema, nil) + assert.NotNil(t, parser) + + // valid input: + // case 1: {"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"} + // case 2: {"id": 1, "vector": [], "x": 8, "$meta": {}} + // case 3: {"id": 1, "vector": [], "$meta": "{\"x\": 8}"} + // case 4: {"id": 1, "vector": [], "$meta": {"x": 8}} + // case 5: {"id": 1, "vector": [], "$meta": {}} + // case 6: {"id": 1, "vector": [], "x": 8} + // case 7: {"id": 1, "vector": []} + + // case 1 + dynamicValues := map[string]interface{}{ + "x": 8, + } + row := map[storage.FieldID]interface{}{ + 106: 1, + 113: "{\"y\": 8}", + } + err := parser.combineDynamicRow(dynamicValues, row) + assert.NoError(t, err) + assert.Contains(t, row, int64(113)) + assert.Contains(t, row[113], "x") + assert.Contains(t, row[113], "y") + + // case 2 + dynamicValues = map[string]interface{}{ + "x": 8, + } + row = map[storage.FieldID]interface{}{ + 106: 1, + 113: map[string]interface{}{}, + } + err = parser.combineDynamicRow(dynamicValues, row) + assert.NoError(t, err) + assert.Contains(t, row, int64(113)) + assert.Contains(t, row[113], "x") + + // case 3/4/5 + dynamicValues = map[string]interface{}{} + row = map[storage.FieldID]interface{}{ + 106: 1, + 113: "{\"x\": 8}", + } + err = parser.combineDynamicRow(dynamicValues, row) + assert.NoError(t, err) + assert.Contains(t, row, int64(113)) + + // case 6 + dynamicValues = map[string]interface{}{ + "x": 8, + } + row = map[storage.FieldID]interface{}{ + 106: 1, + } + err = parser.combineDynamicRow(dynamicValues, row) + assert.NoError(t, err) + assert.Contains(t, row, int64(113)) + assert.Contains(t, row[113], "x") + + // case 7 + dynamicValues = map[string]interface{}{} + row = map[storage.FieldID]interface{}{ + 106: 1, + } + err = parser.combineDynamicRow(dynamicValues, row) + assert.NoError(t, err) + assert.Contains(t, row, int64(113)) + assert.Equal(t, "{}", row[113]) + + // invalid input + dynamicValues = map[string]interface{}{ + "x": 8, + } + row = map[storage.FieldID]interface{}{ + 106: 1, + 113: 5, + } + err = parser.combineDynamicRow(dynamicValues, row) + assert.Error(t, err) + + // no dynamic field + parser.dynamicFieldID = -1 + dynamicValues = map[string]interface{}{ + "x": 8, + } + row = map[storage.FieldID]interface{}{ + 106: 1, + } + err = parser.combineDynamicRow(dynamicValues, row) + assert.NoError(t, err) + assert.NotContains(t, row, int64(113)) +} + +func Test_JSONParserVerifyRow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + schema := &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + EnableDynamicField: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 106, + Name: "FieldID", + IsPrimaryKey: true, + AutoID: false, + Description: "int64", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 113, + Name: "FieldDynamic", + IsPrimaryKey: false, + IsDynamic: true, + Description: "dynamic field", + DataType: schemapb.DataType_JSON, + }, + }, + } + parser := NewJSONParser(ctx, schema, nil) + assert.NotNil(t, parser) + assert.Equal(t, int64(113), parser.dynamicFieldID) + + // dynamic field provided + raw := map[string]interface{}{ + "FieldID": 100, + "FieldDynamic": "{\"x\": 8}", + "y": true, + } + row, err := parser.verifyRow(raw) + assert.NoError(t, err) + assert.Contains(t, row, int64(106)) + assert.Contains(t, row, int64(113)) + assert.Contains(t, row[113], "x") + assert.Contains(t, row[113], "y") + + // dynamic field not provided + raw = map[string]interface{}{ + "FieldID": 100, + } + row, err = parser.verifyRow(raw) + assert.NoError(t, err) + assert.Contains(t, row, int64(106)) + assert.Contains(t, row, int64(113)) + assert.Equal(t, "{}", row[113]) + + // invalid input dynamic field + raw = map[string]interface{}{ + "FieldID": 100, + "FieldDynamic": true, + "y": true, + } + _, err = parser.verifyRow(raw) + assert.Error(t, err) +} diff --git a/internal/util/importutil/numpy_parser.go b/internal/util/importutil/numpy_parser.go index 266137a41af3c..24df73b70bfe6 100644 --- a/internal/util/importutil/numpy_parser.go +++ b/internal/util/importutil/numpy_parser.go @@ -143,8 +143,12 @@ func (p *NumpyParser) Parse(filePaths []string) error { // validateFileNames is to check redundant file and missed file func (p *NumpyParser) validateFileNames(filePaths []string) error { + dynamicFieldName := "" requiredFieldNames := make(map[string]interface{}) for _, schema := range p.collectionSchema.Fields { + if schema.GetIsDynamic() && p.collectionSchema.GetEnableDynamicField() { + dynamicFieldName = schema.GetName() + } if schema.GetIsPrimaryKey() { if !schema.GetAutoID() { requiredFieldNames[schema.GetName()] = nil @@ -168,6 +172,10 @@ func (p *NumpyParser) validateFileNames(filePaths []string) error { // check missed file for name := range requiredFieldNames { + if name == dynamicFieldName { + // dynamic schema field file is not required + continue + } _, ok := fileNames[name] if !ok { log.Error("Numpy parser: there is no file corresponding to field", zap.String("fieldName", name))