Skip to content

Commit

Permalink
Fix the segcore payload reader can't read JSON data (#24212)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <yah2er0ne@outlook.com>
  • Loading branch information
yah01 authored May 20, 2023
1 parent 4fbb3f2 commit 1eb804f
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 28 deletions.
10 changes: 10 additions & 0 deletions internal/core/src/storage/FieldData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "storage/FieldData.h"
#include "common/Json.h"

namespace milvus::storage {

Expand Down Expand Up @@ -95,6 +96,15 @@ FieldDataImpl<Type, is_scalar>::FillFieldData(const std::shared_ptr<arrow::Array
}
return FillFieldData(values.data(), element_count);
}
case DataType::JSON: {
AssertInfo(array->type()->id() == arrow::Type::type::BINARY, "inconsistent data type");
auto binary_array = std::dynamic_pointer_cast<arrow::BinaryArray>(array);
std::vector<std::string> values(element_count);
for (size_t index = 0; index < element_count; ++index) {
values[index] = binary_array->GetString(index);
}
return FillFieldData(values.data(), element_count);
}
case DataType::VECTOR_FLOAT:
case DataType::VECTOR_BINARY: {
auto array_info =
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "arrow/api.h"
#include "common/FieldMeta.h"
#include "common/Json.h"
#include "common/Utils.h"
#include "common/VectorTrait.h"
#include "exceptions/EasyAssert.h"
Expand Down Expand Up @@ -206,5 +207,4 @@ class FieldDataStringImpl : public FieldDataImpl<std::string, true> {
return field_data_[offset].size();
}
};

} // namespace milvus::storage
2 changes: 2 additions & 0 deletions internal/core/src/storage/parquet_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <mutex>

#include "common/Types.h"
#include "storage/parquet_c.h"
#include "storage/PayloadReader.h"
#include "storage/PayloadWriter.h"
Expand Down Expand Up @@ -219,6 +220,7 @@ NewPayloadReader(int columnType, uint8_t* buffer, int64_t buf_size, CPayloadRead
case milvus::DataType::DOUBLE:
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR:
case milvus::DataType::JSON:
case milvus::DataType::VECTOR_BINARY:
case milvus::DataType::VECTOR_FLOAT: {
break;
Expand Down
26 changes: 26 additions & 0 deletions internal/core/unittest/test_parquet_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include <fstream>

#include "common/Types.h"
#include "storage/parquet_c.h"
#include "storage/PayloadReader.h"
#include "storage/PayloadWriter.h"
Expand Down Expand Up @@ -221,6 +222,31 @@ TEST(storage, stringarray) {
ASSERT_EQ(st.error_code, ErrorCode::Success);
}

TEST(storage, jsonarray) {
auto payload = NewPayloadWriter(int(milvus::DataType::JSON));
auto st = AddOneJSONToPayload(payload, (uint8_t*)"{}", 2);
ASSERT_EQ(st.error_code, ErrorCode::Success);
st = AddOneJSONToPayload(payload, (uint8_t*)"{\"key\":123}", 11);
ASSERT_EQ(st.error_code, ErrorCode::Success);

st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::Success);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 2);

CPayloadReader reader;
st = NewPayloadReader(int(milvus::DataType::JSON), (uint8_t*)cb.data, cb.length, &reader);
ASSERT_EQ(st.error_code, ErrorCode::Success);
int length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 2);

ReleasePayloadWriter(payload);
ReleasePayloadReader(reader);
}

TEST(storage, binary_vector) {
int DIM = 16;
auto payload = NewVectorPayloadWriter(int(milvus::DataType::VECTOR_BINARY), DIM);
Expand Down
6 changes: 2 additions & 4 deletions internal/core/unittest/test_utils/DataGen.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,8 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0,
case DataType::JSON: {
vector<std::string> data(N);
for (int i = 0; i < N / repeat_count; i++) {
auto str = R"({"int":)" + std::to_string(er()) +
R"(,"double":)" +
std::to_string(static_cast<double>(er())) +
R"(,"string":")" + std::to_string(er()) +
auto str = R"({"int":)" + std::to_string(er()) + R"(,"double":)" +
std::to_string(static_cast<double>(er())) + R"(,"string":")" + std::to_string(er()) +
R"(","bool": true)" + "}";
data[i] = str;
}
Expand Down
14 changes: 14 additions & 0 deletions internal/datanode/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,20 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
}
rst = data

case schemapb.DataType_JSON:
var data = &storage.JSONFieldData{
Data: make([][]byte, 0, len(content)),
}

for _, c := range content {
r, ok := c.([]byte)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data

case schemapb.DataType_FloatVector:
var data = &storage.FloatVectorFieldData{
Data: []float32{},
Expand Down
2 changes: 2 additions & 0 deletions internal/datanode/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
{true, schemapb.DataType_Float, []interface{}{float32(1), float32(2)}, "valid float32"},
{true, schemapb.DataType_Double, []interface{}{float64(1), float64(2)}, "valid float64"},
{true, schemapb.DataType_VarChar, []interface{}{"test1", "test2"}, "valid varChar"},
{true, schemapb.DataType_JSON, []interface{}{[]byte("{\"key\":\"value\"}"), []byte("{\"hello\":\"world\"}")}, "valid json"},
{true, schemapb.DataType_FloatVector, []interface{}{[]float32{1.0, 2.0}}, "valid floatvector"},
{true, schemapb.DataType_BinaryVector, []interface{}{[]byte{255}}, "valid binaryvector"},
{false, schemapb.DataType_Bool, []interface{}{1, 2}, "invalid bool"},
Expand All @@ -112,6 +113,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
{false, schemapb.DataType_Float, []interface{}{nil, nil}, "invalid float32"},
{false, schemapb.DataType_Double, []interface{}{nil, nil}, "invalid float64"},
{false, schemapb.DataType_VarChar, []interface{}{nil, nil}, "invalid varChar"},
{false, schemapb.DataType_JSON, []interface{}{nil, nil}, "invalid json"},
{false, schemapb.DataType_FloatVector, []interface{}{nil, nil}, "invalid floatvector"},
{false, schemapb.DataType_BinaryVector, []interface{}{nil, nil}, "invalid binaryvector"},
{false, schemapb.DataType_None, nil, "invalid data type"},
Expand Down
12 changes: 12 additions & 0 deletions internal/util/typeutil/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
dstScalar.GetDoubleData().Data = dstScalar.GetDoubleData().Data[:len(dstScalar.GetDoubleData().Data)-1]
case *schemapb.ScalarField_StringData:
dstScalar.GetStringData().Data = dstScalar.GetStringData().Data[:len(dstScalar.GetStringData().Data)-1]
case *schemapb.ScalarField_JsonData:
dstScalar.GetJsonData().Data = dstScalar.GetJsonData().Data[:len(dstScalar.GetJsonData().Data)-1]
default:
log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
}
Expand Down Expand Up @@ -643,6 +645,16 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) {
} else {
dstScalar.GetStringData().Data = append(dstScalar.GetStringData().Data, srcScalar.StringData.Data...)
}
case *schemapb.ScalarField_JsonData:
if dstScalar.GetJsonData() == nil {
dstScalar.Data = &schemapb.ScalarField_JsonData{
JsonData: &schemapb.JSONArray{
Data: srcScalar.JsonData.Data,
},
}
} else {
dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data...)
}
default:
log.Error("Not supported field type", zap.String("field type", srcFieldData.Type.String()))
}
Expand Down
88 changes: 65 additions & 23 deletions internal/util/typeutil/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,32 +631,39 @@ func TestDeleteFieldData(t *testing.T) {
Int64FieldName = "Int64Field"
FloatFieldName = "FloatField"
DoubleFieldName = "DoubleField"
JSONFieldName = "JSONField"
BinaryVectorFieldName = "BinaryVectorField"
FloatVectorFieldName = "FloatVectorField"
BoolFieldID = common.StartOfUserFieldID + 1
Int32FieldID = common.StartOfUserFieldID + 2
Int64FieldID = common.StartOfUserFieldID + 3
FloatFieldID = common.StartOfUserFieldID + 4
DoubleFieldID = common.StartOfUserFieldID + 5
BinaryVectorFieldID = common.StartOfUserFieldID + 6
FloatVectorFieldID = common.StartOfUserFieldID + 7
)
const (
BoolFieldID = common.StartOfUserFieldID + iota
Int32FieldID
Int64FieldID
FloatFieldID
DoubleFieldID
JSONFieldID
BinaryVectorFieldID
FloatVectorFieldID
)

BoolArray := []bool{true, false}
Int32Array := []int32{1, 2}
Int64Array := []int64{11, 22}
FloatArray := []float32{1.0, 2.0}
DoubleArray := []float64{11.0, 22.0}
JSONArray := [][]byte{[]byte("{\"hello\":0}"), []byte("{\"key\":1}")}
BinaryVector := []byte{0x12, 0x34}
FloatVector := []float32{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 11.0, 22.0, 33.0, 44.0, 55.0, 66.0, 77.0, 88.0}

result1 := make([]*schemapb.FieldData, 7)
result2 := make([]*schemapb.FieldData, 7)
result1 := make([]*schemapb.FieldData, 8)
result2 := make([]*schemapb.FieldData, 8)
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatFieldName, FloatFieldID, schemapb.DataType_Float, FloatArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(DoubleFieldName, DoubleFieldID, schemapb.DataType_Double, DoubleArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(JSONFieldName, JSONFieldID, schemapb.DataType_JSON, JSONArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(BinaryVectorFieldName, BinaryVectorFieldID, schemapb.DataType_BinaryVector, BinaryVector[0:Dim/8], Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:Dim], Dim))

Expand All @@ -666,30 +673,33 @@ func TestDeleteFieldData(t *testing.T) {
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int64FieldName, Int64FieldID, schemapb.DataType_Int64, Int64Array[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatFieldName, FloatFieldID, schemapb.DataType_Float, FloatArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(DoubleFieldName, DoubleFieldID, schemapb.DataType_Double, DoubleArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(JSONFieldName, JSONFieldID, schemapb.DataType_JSON, JSONArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(BinaryVectorFieldName, BinaryVectorFieldID, schemapb.DataType_BinaryVector, BinaryVector[Dim/8:2*Dim/8], Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[Dim:2*Dim], Dim))

AppendFieldData(result1, fieldDataArray1, 0)
AppendFieldData(result1, fieldDataArray2, 0)
DeleteFieldData(result1)
assert.Equal(t, BoolArray[0:1], result1[0].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[0:1], result1[1].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[0:1], result1[2].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[0:1], result1[3].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[0:1], result1[4].GetScalars().GetDoubleData().Data)
assert.Equal(t, BinaryVector[0:Dim/8], result1[5].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[0:Dim], result1[6].GetVectors().GetFloatVector().Data)
assert.Equal(t, BoolArray[0:1], result1[BoolFieldID-common.StartOfUserFieldID].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[0:1], result1[Int32FieldID-common.StartOfUserFieldID].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[0:1], result1[Int64FieldID-common.StartOfUserFieldID].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[0:1], result1[FloatFieldID-common.StartOfUserFieldID].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[0:1], result1[DoubleFieldID-common.StartOfUserFieldID].GetScalars().GetDoubleData().Data)
assert.Equal(t, JSONArray[0:1], result1[JSONFieldID-common.StartOfUserFieldID].GetScalars().GetJsonData().Data)
assert.Equal(t, BinaryVector[0:Dim/8], result1[BinaryVectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[0:Dim], result1[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data)

AppendFieldData(result2, fieldDataArray2, 0)
AppendFieldData(result2, fieldDataArray1, 0)
DeleteFieldData(result2)
assert.Equal(t, BoolArray[1:2], result2[0].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[1:2], result2[1].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[1:2], result2[2].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[1:2], result2[3].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[1:2], result2[4].GetScalars().GetDoubleData().Data)
assert.Equal(t, BinaryVector[Dim/8:2*Dim/8], result2[5].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[Dim:2*Dim], result2[6].GetVectors().GetFloatVector().Data)
assert.Equal(t, BoolArray[1:2], result2[BoolFieldID-common.StartOfUserFieldID].GetScalars().GetBoolData().Data)
assert.Equal(t, Int32Array[1:2], result2[Int32FieldID-common.StartOfUserFieldID].GetScalars().GetIntData().Data)
assert.Equal(t, Int64Array[1:2], result2[Int64FieldID-common.StartOfUserFieldID].GetScalars().GetLongData().Data)
assert.Equal(t, FloatArray[1:2], result2[FloatFieldID-common.StartOfUserFieldID].GetScalars().GetFloatData().Data)
assert.Equal(t, DoubleArray[1:2], result2[DoubleFieldID-common.StartOfUserFieldID].GetScalars().GetDoubleData().Data)
assert.Equal(t, JSONArray[1:2], result2[JSONFieldID-common.StartOfUserFieldID].GetScalars().GetJsonData().Data)
assert.Equal(t, BinaryVector[Dim/8:2*Dim/8], result2[BinaryVectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector)
assert.Equal(t, FloatVector[Dim:2*Dim], result2[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data)
}

func TestGetPrimaryFieldSchema(t *testing.T) {
Expand Down Expand Up @@ -1011,3 +1021,35 @@ func TestCalcColumnSize(t *testing.T) {
assert.Equal(t, expected, size, field.GetName())
}
}

func TestMergeFieldData(t *testing.T) {
dstFields := []*schemapb.FieldData{
genFieldData("int64", 100, schemapb.DataType_Int64, []int64{1, 2, 3}, 1),
genFieldData("vector", 101, schemapb.DataType_FloatVector, []float32{1, 2, 3}, 1),
genFieldData("json", 102, schemapb.DataType_JSON, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)}, 1),
}

srcFields := []*schemapb.FieldData{
genFieldData("int64", 100, schemapb.DataType_Int64, []int64{4, 5, 6}, 1),
genFieldData("vector", 101, schemapb.DataType_FloatVector, []float32{4, 5, 6}, 1),
genFieldData("json", 102, schemapb.DataType_JSON, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)}, 1),
}

MergeFieldData(dstFields, srcFields)

assert.Equal(t, []int64{1, 2, 3, 4, 5, 6}, dstFields[0].GetScalars().GetLongData().Data)
assert.Equal(t, []float32{1, 2, 3, 4, 5, 6}, dstFields[1].GetVectors().GetFloatVector().Data)
assert.Equal(t, [][]byte{[]byte(`{"key":"value"}`), []byte(`{"hello":"world"}`), []byte(`{"key":"value"}`), []byte(`{"hello":"world"}`)},
dstFields[2].GetScalars().GetJsonData().Data)

emptyField := &schemapb.FieldData{
Type: schemapb.DataType_None,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: nil,
},
},
}

MergeFieldData([]*schemapb.FieldData{emptyField}, []*schemapb.FieldData{emptyField})
}

0 comments on commit 1eb804f

Please sign in to comment.