Skip to content

Commit

Permalink
Support anonymous dynamic schema
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed May 23, 2023
1 parent 2588edc commit d5e2f61
Show file tree
Hide file tree
Showing 15 changed files with 38 additions and 95 deletions.
3 changes: 2 additions & 1 deletion client/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (s *MockSuiteBase) getVarcharFieldData(name string, data []string) *schema.
}
}

func (s *MockSuiteBase) getJSONBytesFieldData(name string, data [][]byte) *schema.FieldData {
func (s *MockSuiteBase) getJSONBytesFieldData(name string, data [][]byte, isDynamic bool) *schema.FieldData {
return &schema.FieldData{
Type: schema.DataType_JSON,
FieldName: name,
Expand All @@ -197,6 +197,7 @@ func (s *MockSuiteBase) getJSONBytesFieldData(name string, data [][]byte) *schem
},
},
},
IsDynamic: isDynamic,
}
}

Expand Down
7 changes: 2 additions & 5 deletions client/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ func (c *GrpcClient) Search(ctx context.Context, collName string, partitions []s
}

func (c *GrpcClient) parseSearchResult(sch *entity.Schema, outputFields []string, fieldDataList []*schema.FieldData, idx, from, to int) ([]entity.Column, error) {
dynamicField := sch.GetDynamicField()
//dynamicField := sch.GetDynamicField()
fields := make(map[string]*schema.FieldData)
var dynamicColumn *entity.ColumnJSONBytes
for _, fieldData := range fieldDataList {
fields[fieldData.GetFieldName()] = fieldData
if dynamicField != nil && fieldData.GetFieldName() == dynamicField.Name {
if fieldData.GetIsDynamic() {
column, err := entity.FieldDataColumn(fieldData, from, to)
if err != nil {
return nil, err
Expand All @@ -115,9 +115,6 @@ func (c *GrpcClient) parseSearchResult(sch *entity.Schema, outputFields []string
var column entity.Column
var err error
if !ok {
if dynamicField == nil {
return nil, errors.New("output fields not match when dynamic field disabled")
}
if dynamicColumn == nil {
return nil, errors.New("output fields not match and result field data does not contain dynamic field")
}
Expand Down
6 changes: 3 additions & 3 deletions client/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,10 @@ func (s *SearchSuite) TestSearchSuccess() {
NumQueries: 1,
TopK: 2,
FieldsData: []*schema.FieldData{
s.getJSONBytesFieldData("$meta", [][]byte{
s.getJSONBytesFieldData("", [][]byte{
[]byte(`{"A": 123, "B": "456"}`),
[]byte(`{"B": "abc", "A": 456}`),
}),
}, true),
},
Ids: &schema.IDs{
IdField: &schema.IDs_IntId{
Expand Down Expand Up @@ -663,7 +663,7 @@ func (s *QuerySuite) TestQuerySuccess() {
s.getJSONBytesFieldData("$meta", [][]byte{
[]byte(`{"A": 123, "B": "456"}`),
[]byte(`{"B": "abc", "A": 456}`),
}),
}, true),
},
}, nil)

Expand Down
14 changes: 4 additions & 10 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName
return nil, err
}

// convert columns to field data
fieldsData, rowSize, err := c.processInsertColumns(coll.Schema, columns...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -84,15 +85,6 @@ func (c *GrpcClient) Insert(ctx context.Context, collName string, partitionName
func (c *GrpcClient) processInsertColumns(colSchema *entity.Schema, columns ...entity.Column) ([]*schema.FieldData, int, error) {
// setup dynamic related var
isDynamic := colSchema.EnableDynamicField
var dynamicField *entity.Field
if isDynamic {
for _, field := range colSchema.Fields {
if field.IsDynamic {
dynamicField = field
break
}
}
}

// check columns and field matches
var rowSize int
Expand Down Expand Up @@ -157,7 +149,8 @@ func (c *GrpcClient) processInsertColumns(colSchema *entity.Schema, columns ...e
fieldsData = append(fieldsData, fixedColumn.FieldData())
}
if len(dynamicColumns) > 0 {
col, err := c.mergeDynamicColumns(dynamicField.Name, rowSize, dynamicColumns)
// use empty column name here
col, err := c.mergeDynamicColumns("", rowSize, dynamicColumns)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -193,6 +186,7 @@ func (c *GrpcClient) mergeDynamicColumns(dynamicName string, rowSize int, column
},
},
},
IsDynamic: true,
}, nil
}

Expand Down
9 changes: 4 additions & 5 deletions client/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
"context"
"testing"

"github.com/cockroachdb/errors"
common "github.com/milvus-io/milvus-proto/go-api/commonpb"
server "github.com/milvus-io/milvus-proto/go-api/milvuspb"
schema "github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
)

type InsertSuite struct {
Expand Down Expand Up @@ -178,7 +178,7 @@ func (s *InsertSuite) TestInsertFail() {
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).Return(
nil, grpc.ErrClientConnClosing,
nil, errors.New("mocked error"),
)

_, err := c.Insert(ctx, testCollectionName, "partition_1",
Expand Down Expand Up @@ -234,16 +234,15 @@ func (s *InsertSuite) TestInsertSuccess() {
s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithDynamicFieldEnabled(true).
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")).
WithField(entity.NewField().WithName("$meta").WithDataType(entity.FieldTypeJSON).WithIsDynamic(true)),
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Run(func(ctx context.Context, req *server.InsertRequest) {
s.Equal(2, len(req.GetFieldsData()))
var found bool
for _, fd := range req.GetFieldsData() {
if fd.GetFieldName() == "$meta" {
if fd.GetFieldName() == "" && fd.GetIsDynamic() {
found = true
break
}
Expand Down
3 changes: 1 addition & 2 deletions client/row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,7 @@ func (s *InsertByRowsSuite) TestSuccess() {
s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithName(testCollectionName).WithDynamicFieldEnabled(true).
WithField(entity.NewField().WithName("ID").WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true)).
WithField(entity.NewField().WithName("Vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")).
WithField(entity.NewField().WithName("$meta").WithDataType(entity.FieldTypeJSON).WithIsDynamic(true)),
WithField(entity.NewField().WithName("Vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Expand Down
5 changes: 3 additions & 2 deletions entity/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,14 @@ func FieldDataColumn(fd *schema.FieldData, begin, end int) (Column, error) {

case schema.DataType_JSON:
data, ok := fd.GetScalars().GetData().(*schema.ScalarField_JsonData)
isDynamic := fd.GetIsDynamic()
if !ok {
return nil, errFieldDataTypeNotMatch
}
if end < 0 {
return NewColumnJSONBytes(fd.GetFieldName(), data.JsonData.GetData()[begin:]), nil
return NewColumnJSONBytes(fd.GetFieldName(), data.JsonData.GetData()[begin:]).WithIsDynamic(isDynamic), nil
}
return NewColumnJSONBytes(fd.GetFieldName(), data.JsonData.GetData()[begin:end]), nil
return NewColumnJSONBytes(fd.GetFieldName(), data.JsonData.GetData()[begin:end]).WithIsDynamic(isDynamic), nil

case schema.DataType_FloatVector:
vectors := fd.GetVectors()
Expand Down
11 changes: 9 additions & 2 deletions entity/columns_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ var _ (Column) = (*ColumnJSONBytes)(nil)
// ColumnJSONBytes column type for JSON.
// all items are marshaled json bytes.
type ColumnJSONBytes struct {
name string
values [][]byte
name string
values [][]byte
isDynamic bool
}

// Name returns column name.
Expand Down Expand Up @@ -44,6 +45,7 @@ func (c *ColumnJSONBytes) FieldData() *schema.FieldData {
fd := &schema.FieldData{
Type: schema.DataType_JSON,
FieldName: c.name,
IsDynamic: c.isDynamic,
}

fd.Field = &schema.FieldData_Scalars{
Expand Down Expand Up @@ -83,6 +85,11 @@ func (c *ColumnJSONBytes) Data() [][]byte {
return c.values
}

func (c *ColumnJSONBytes) WithIsDynamic(isDynamic bool) *ColumnJSONBytes {
c.isDynamic = isDynamic
return c
}

// NewColumnJSONBytes composes a Column with json bytes.
func NewColumnJSONBytes(name string, values [][]byte) *ColumnJSONBytes {
return &ColumnJSONBytes{
Expand Down
3 changes: 1 addition & 2 deletions entity/columns_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s *ColumnJSONBytesSuite) TestAttrMethods() {
columnLen := 8 + rand.Intn(10)

v := make([][]byte, columnLen)
column := NewColumnJSONBytes(columnName, v)
column := NewColumnJSONBytes(columnName, v).WithIsDynamic(true)

s.Run("test_meta", func() {
ft := FieldTypeJSON
Expand Down Expand Up @@ -70,7 +70,6 @@ func (s *ColumnJSONBytesSuite) TestAttrMethods() {
err = column.AppendValue(1)
s.Error(err)
})

}

func TestColumnJSONBytes(t *testing.T) {
Expand Down
12 changes: 4 additions & 8 deletions entity/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,7 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
case FieldTypeJSON:
data := make([][]byte, 0, rowsLen)
col := NewColumnJSONBytes(field.Name, data)
if isDynamic && field.IsDynamic {
dynamicCol = col
} else {
nameColumns[field.Name] = col
}
nameColumns[field.Name] = col
case FieldTypeFloatVector:
data := make([][]float32, 0, rowsLen)
dimStr, has := field.TypeParams[TypeParamDim]
Expand Down Expand Up @@ -318,8 +314,8 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
}
}

if isDynamic && dynamicCol == nil {
return nil, errors.New("schema dynamic field enabled but field not found")
if isDynamic {
dynamicCol = NewColumnJSONBytes("", make([][]byte, 0, rowsLen)).WithIsDynamic(true)
}

for _, row := range rows {
Expand Down Expand Up @@ -406,7 +402,7 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
for _, column := range nameColumns {
columns = append(columns, column)
}
if dynamicCol != nil {
if isDynamic {
columns = append(columns, dynamicCol)
}
return columns, nil
Expand Down
7 changes: 2 additions & 5 deletions entity/rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,7 @@ type RowsSuite struct {
func (s *RowsSuite) TestDynamicSchema() {
s.Run("all_fallback_dynamic", func() {
columns, err := RowsToColumns([]Row{&ValidStruct{}},
NewSchema().WithField(
NewField().WithName("$meta").WithDataType(FieldTypeJSON).WithIsDynamic(true),
).
WithDynamicFieldEnabled(true),
NewSchema().WithDynamicFieldEnabled(true),
)
s.NoError(err)
s.Equal(1, len(columns))
Expand All @@ -262,7 +259,7 @@ func (s *RowsSuite) TestDynamicSchema() {
NewField().WithName("ID").WithDataType(FieldTypeInt64).WithIsPrimaryKey(true),
).WithDynamicFieldEnabled(true),
)
s.Error(err)
s.NoError(err)
})
}

Expand Down
13 changes: 0 additions & 13 deletions entity/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,6 @@ func (s *Schema) WithField(f *Field) *Schema {
return s
}

// GetDynamicField returns dynamic field when EnabledDynamicField is enabled.
func (s *Schema) GetDynamicField() *Field {
if !s.EnableDynamicField {
return nil
}
for _, field := range s.Fields {
if field.IsDynamic {
return field
}
}
return nil
}

// ProtoMessage returns corresponding server.CollectionSchema
func (s *Schema) ProtoMessage() *schema.CollectionSchema {
r := &schema.CollectionSchema{
Expand Down
36 changes: 0 additions & 36 deletions entity/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,42 +107,6 @@ func (s *SchemaSuite) TestBasic() {
}
}

func (s *SchemaSuite) TestGetDynamicField() {
testCases := []struct {
tag string
input *Schema
expectValue bool
}{
{
"dynamic_field_disabled",
NewSchema().WithField(NewField().WithIsDynamic(true)),
false,
},
{
"dynamic_enabled_not_found",
NewSchema().WithDynamicFieldEnabled(true).WithField(NewField().WithName("$meta").WithIsDynamic(false)),
false,
},
{
"dynamic_enabled_found",
NewSchema().WithDynamicFieldEnabled(true).WithField(NewField().WithName("$meta").WithIsDynamic(true)),
true,
},
}

for _, c := range testCases {
s.Run(c.tag, func() {
f := c.input.GetDynamicField()
if c.expectValue {
s.Require().NotNil(f)
s.True(f.IsDynamic)
} else {
s.Nil(f)
}
})
}
}

func TestSchema(t *testing.T) {
suite.Run(t, new(SchemaSuite))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/golang/protobuf v1.5.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87
github.com/stretchr/testify v1.8.1
google.golang.org/grpc v1.48.0
google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ github.com/milvus-io/milvus-proto/go-api v0.0.0-20230418025426-3735aafecc6c h1:9
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230418025426-3735aafecc6c/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63 h1:V962mSHjOFUbuMgAXziBdbYPOCVZmN1MkqEeKpME+MA=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87 h1:LdDHjEjus1NdC9ELbpQa6DfUHJotJUW2kD4S+8nvjw4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230522080721-ef84459b8f87/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down

0 comments on commit d5e2f61

Please sign in to comment.