Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] Support dynamic field for bulkinsert #24304

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions internal/util/importutil/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,21 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
}
case schemapb.DataType_JSON:
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
// for JSON data, we accept two kinds input: string and map[string]interface
// user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}}
if value, ok := obj.(string); ok {
var dummy interface{}
err := json.Unmarshal([]byte(value), &dummy)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w", value, schema.GetName(), err)
}
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value))
} else if mp, ok := obj.(map[string]interface{}); ok {
bs, err := json.Marshal(mp)
if err != nil {
return fmt.Errorf("failed to parse value for JSON field '%s', error: %w", schema.GetName(), err)
}
field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, bs)
} else {
return fmt.Errorf("illegal value '%v' for JSON type field '%s'", obj, schema.GetName())
}
Expand Down Expand Up @@ -372,6 +380,48 @@ func triggerGC() {
debug.FreeOSMemory()
}

// if user didn't provide dynamic data, fill the dynamic field by "{}"
func fillDynamicData(blockData map[storage.FieldID]storage.FieldData, collectionSchema *schemapb.CollectionSchema) error {
if !collectionSchema.GetEnableDynamicField() {
return nil
}

dynamicFieldID := int64(-1)
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
if schema.GetIsDynamic() {
dynamicFieldID = schema.GetFieldID()
break
}
}

if dynamicFieldID < 0 {
return fmt.Errorf("the collection schema is dynamic but dynamic field is not found")
}

rowCount := 0
if len(blockData) > 0 {
for _, v := range blockData {
rowCount = v.RowNum()
}
}

_, ok := blockData[dynamicFieldID]
if !ok {
data := &storage.JSONFieldData{
Data: make([][]byte, 0),
}

bs := []byte("{}")
for i := 0; i < rowCount; i++ {
data.Data = append(data.Data, bs)
}
blockData[dynamicFieldID] = data
}

return nil
}

// tryFlushBlocks does the two things:
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
// 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block
Expand Down Expand Up @@ -407,7 +457,12 @@ func tryFlushBlocks(ctx context.Context,
// force to flush, called at the end of Read()
if force && rowCount > 0 {
printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
err := callFlushFunc(blockData, i)
err := fillDynamicData(blockData, collectionSchema)
if err != nil {
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
}
err = callFlushFunc(blockData, i)
if err != nil {
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err))
return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err)
Expand All @@ -426,7 +481,12 @@ func tryFlushBlocks(ctx context.Context,
// initialize a new FieldData list for next round batch read
if size > int(blockSize) && rowCount > 0 {
printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil)
err := callFlushFunc(blockData, i)
err := fillDynamicData(blockData, collectionSchema)
if err != nil {
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
}
err = callFlushFunc(blockData, i)
if err != nil {
log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err))
return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err)
Expand Down Expand Up @@ -470,7 +530,12 @@ func tryFlushBlocks(ctx context.Context,

if rowCount > 0 {
printFieldsDataInfo(blockData, "import util: prepare to flush biggest block", nil)
err := callFlushFunc(blockData, biggestItem)
err := fillDynamicData(blockData, collectionSchema)
if err != nil {
log.Error("Import util: failed to fill dynamic field", zap.Error(err))
return fmt.Errorf("failed to fill dynamic field, error: %w", err)
}
err = callFlushFunc(blockData, biggestItem)
if err != nil {
log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem))
return fmt.Errorf("failed to flush biggest block data for shard id %d, error: %w", biggestItem, err)
Expand Down
95 changes: 95 additions & 0 deletions internal/util/importutil/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ func Test_InitValidators(t *testing.T) {
validVal = "aa"
checkConvertFunc("FieldString", validVal, nil)

validVal = map[string]interface{}{"x": 5, "y": true, "z": "hello"}
checkConvertFunc("FieldJSON", validVal, nil)
checkConvertFunc("FieldJSON", "{\"x\": 8}", "{")

// the binary vector dimension is 16, shoud input two uint8 values, each value should between 0~255
validVal = []interface{}{jsonNumber("100"), jsonNumber("101")}
invalidVal = []interface{}{jsonNumber("100"), jsonNumber("1256")}
Expand Down Expand Up @@ -540,6 +544,97 @@ func Test_GetFieldDimension(t *testing.T) {
assert.Equal(t, 0, dim)
}

func Test_FillDynamicData(t *testing.T) {
ctx := context.Background()

schema := &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
EnableDynamicField: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 106,
Name: "FieldID",
IsPrimaryKey: true,
AutoID: false,
Description: "int64",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 113,
Name: "FieldDynamic",
IsPrimaryKey: false,
IsDynamic: true,
Description: "dynamic field",
DataType: schemapb.DataType_JSON,
},
},
}

flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}

rowCount := 1000
idData := &storage.Int64FieldData{
Data: make([]int64, 0),
}
for i := 0; i < rowCount; i++ {
idData.Data = append(idData.Data, int64(i)) // this is primary key
}

t.Run("dynamic field is filled", func(t *testing.T) {
blockData := map[storage.FieldID]storage.FieldData{
106: idData,
}

segmentsData := []map[storage.FieldID]storage.FieldData{
blockData,
}

err := fillDynamicData(blockData, schema)
assert.NoError(t, err)
assert.Equal(t, 2, len(blockData))
assert.Contains(t, blockData, int64(113))
assert.Equal(t, rowCount, blockData[113].RowNum())
assert.Equal(t, []byte("{}"), blockData[113].GetRow(0).([]byte))

err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1, 1, false)
assert.NoError(t, err)
})

t.Run("collection is dynamic by no dynamic field", func(t *testing.T) {
blockData := map[storage.FieldID]storage.FieldData{
106: idData,
}
schema.Fields[1].IsDynamic = false
err := fillDynamicData(blockData, schema)
assert.Error(t, err)

segmentsData := []map[storage.FieldID]storage.FieldData{
blockData,
}

err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024*1024, 1, true)
assert.Error(t, err)

err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024, 1, false)
assert.Error(t, err)

err = tryFlushBlocks(ctx, segmentsData, schema, flushFunc, 1024*1024, 1, false)
assert.Error(t, err)
})

t.Run("collection is not dynamic", func(t *testing.T) {
blockData := map[storage.FieldID]storage.FieldData{
106: idData,
}
schema.EnableDynamicField = false
err := fillDynamicData(blockData, schema)
assert.NoError(t, err)
})
}

func Test_TryFlushBlocks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutil/import_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ func Test_ImportWrapperRowBased(t *testing.T) {

content := []byte(`{
"rows":[
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": {"x": 2}, "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{\"k\": 2.5}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"y\": \"hello\"}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": {"y": "hello"}, "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
]
Expand Down
78 changes: 68 additions & 10 deletions internal/util/importutil/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

const (
Expand All @@ -44,17 +45,17 @@ type IOReader struct {
}

type JSONParser struct {
ctx context.Context // for canceling parse process
bufRowCount int // max rows in a buffer
fields map[string]int64 // fields need to be parsed
ctx context.Context // for canceling parse process
bufRowCount int // max rows in a buffer
name2FieldID map[string]storage.FieldID
updateProgressFunc func(percent int64) // update working progress percent value
dynamicFieldID storage.FieldID // dynamic field id, set to -1 if no dynamic field
}

// NewJSONParser helper function to create a JSONParser
func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema, updateProgressFunc func(percent int64)) *JSONParser {
fields := make(map[string]int64)
name2FieldID := make(map[string]storage.FieldID)
dynamicFieldID := int64(-1)
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
// RowIDField and TimeStampField is internal field, no need to parse
Expand All @@ -66,16 +67,18 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch
continue
}

fields[schema.GetName()] = 0
name2FieldID[schema.GetName()] = schema.GetFieldID()
if schema.GetIsDynamic() && collectionSchema.GetEnableDynamicField() {
dynamicFieldID = schema.GetFieldID()
}
}

parser := &JSONParser{
ctx: ctx,
bufRowCount: 1024,
fields: fields,
name2FieldID: name2FieldID,
updateProgressFunc: updateProgressFunc,
dynamicFieldID: dynamicFieldID,
}
adjustBufSize(parser, collectionSchema)

Expand Down Expand Up @@ -108,27 +111,80 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
parser.bufRowCount = bufRowCount
}

func (p *JSONParser) combineDynamicRow(dynamicValues map[string]interface{}, row map[storage.FieldID]interface{}) error {
if p.dynamicFieldID < 0 {
return nil
}
// combine the dynamic field value
// valid input:
// case 1: {"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"}
// case 2: {"id": 1, "vector": [], "x": 8, "$meta": {}}
// case 3: {"id": 1, "vector": [], "$meta": "{\"x\": 8}"}
// case 4: {"id": 1, "vector": [], "$meta": {"x": 8}}
// case 5: {"id": 1, "vector": [], "$meta": {}}
// case 6: {"id": 1, "vector": [], "x": 8}
// case 7: {"id": 1, "vector": []}
obj, ok := row[p.dynamicFieldID]
if ok {
if len(dynamicValues) > 0 {
if value, is := obj.(string); is {
// case 1
mp := make(map[string]interface{})
json.Unmarshal([]byte(value), &mp)
maps.Copy(dynamicValues, mp)
} else if mp, is := obj.(map[string]interface{}); is {
// case 2
maps.Copy(dynamicValues, mp)
} else {
// invalid input
return errors.New("illegal value for dynamic field")
}
row[p.dynamicFieldID] = dynamicValues
}
// else case 3/4/5
} else {
if len(dynamicValues) > 0 {
// case 6
row[p.dynamicFieldID] = dynamicValues
} else {
// case 7
row[p.dynamicFieldID] = "{}"
}
}

return nil
}

func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}, error) {
stringMap, ok := raw.(map[string]interface{})
if !ok {
log.Error("JSON parser: invalid JSON format, each row should be a key-value map")
return nil, errors.New("invalid JSON format, each row should be a key-value map")
}

dynamicValues := make(map[string]interface{})
row := make(map[storage.FieldID]interface{})
for k, v := range stringMap {
// if user provided redundant field, return error
fieldID, ok := p.name2FieldID[k]
if !ok {
if ok {
row[fieldID] = v
} else if p.dynamicFieldID >= 0 {
// has dynamic field. put redundant pair to dynamicValues
dynamicValues[k] = v
} else {
// no dynamic field. if user provided redundant field, return error
log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k))
return nil, fmt.Errorf("the field '%s' is not defined in collection schema", k)
}
row[fieldID] = v
}

// some fields not provided?
if len(row) != len(p.name2FieldID) {
for k, v := range p.name2FieldID {
if v == p.dynamicFieldID {
// dyanmic field, allow user ignore this field
continue
}
_, ok := row[v]
if !ok {
log.Error("JSON parser: a field value is missed", zap.String("fieldName", k))
Expand All @@ -137,7 +193,9 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}
}
}

return row, nil
// combine the redundant pairs into dunamic field(if has)
err := p.combineDynamicRow(dynamicValues, row)
return row, err
}

func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error {
Expand Down
Loading