Skip to content

Commit

Permalink
Fix json validation during serialization (#1101)
Browse files Browse the repository at this point in the history
* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* Fix JSON validation during serialization

---------

Co-authored-by: Confluent Jenkins Bot <jenkins@confluent.io>
  • Loading branch information
rayokota and ConfluentJenkins authored Nov 9, 2023
1 parent 5f4c973 commit fed30be
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 12 deletions.
2 changes: 1 addition & 1 deletion schemaregistry/serde/avro/avro_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *GenericSerializer) Serialize(topic string, msg interface{}) ([]byte, er
info := schemaregistry.SchemaInfo{
Schema: avroType.String(),
}
id, err := s.GetID(topic, msg, info)
id, err := s.GetID(topic, msg, &info)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/serde/avro/avro_specific.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *SpecificSerializer) Serialize(topic string, msg interface{}) ([]byte, e
info := schemaregistry.SchemaInfo{
Schema: avroMsg.Schema(),
}
id, err := s.GetID(topic, avroMsg, info)
id, err := s.GetID(topic, avroMsg, &info)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/serde/jsonschema/json_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) {
Schema: string(raw),
SchemaType: "JSON",
}
id, err := s.GetID(topic, msg, info)
id, err := s.GetID(topic, msg, &info)

This comment has been minimized.

Copy link
@isaacrivas-ph

isaacrivas-ph Apr 3, 2024

What is the objective of the Schema Registry Client if the Schema is inferred from the message object and it is later validated against itself?
I would like to confirm if the objective of this Serializer is to use the Schema of the provided Topic to validate the message data against that Schema.

if err != nil {
return nil, err
}
Expand Down
60 changes: 60 additions & 0 deletions schemaregistry/serde/jsonschema/json_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package jsonschema

import (
"encoding/json"
"github.com/invopop/jsonschema"
"strings"
"testing"

"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
Expand Down Expand Up @@ -85,6 +88,63 @@ func TestJSONSchemaSerdeWithNested(t *testing.T) {
serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj))
}

func TestFailingJSONSchemaValidationWithSimple(t *testing.T) {
serde.MaybeFail = serde.InitFailFunc(t)
var err error
conf := schemaregistry.NewConfig("mock://")

client, err := schemaregistry.NewClient(conf)
serde.MaybeFail("Schema Registry configuration", err)

serConfig := NewSerializerConfig()
serConfig.EnableValidation = true
// We don't want to risk registering one instead of using the already registered one
serConfig.AutoRegisterSchemas = false
serConfig.UseLatestVersion = true
ser, err := NewSerializer(client, serde.ValueSerde, serConfig)
serde.MaybeFail("Serializer configuration", err)

obj := JSONDemoSchema{}
jschema := jsonschema.Reflect(obj)
raw, err := json.Marshal(jschema)
serde.MaybeFail("Schema marshalling", err)
info := schemaregistry.SchemaInfo{
Schema: string(raw),
SchemaType: "JSON",
}

id, err := client.Register("topic1-value", info, false)
serde.MaybeFail("Schema registration", err)
if id <= 0 {
t.Errorf("Expected valid schema id, found %d", id)
}

_, err = ser.Serialize("topic1", &obj)
if err != nil {
t.Errorf("Expected no validation error, found %s", err)
}

diffObj := DifferentJSONDemoSchema{}
_, err = ser.Serialize("topic1", &diffObj)
if err == nil || !strings.Contains(err.Error(), "jsonschema") {
t.Errorf("Expected validation error, found %s", err)
}
}

type DifferentJSONDemoSchema struct {
IntField int32 `json:"IntField"`

ExtraStringField string `json:"ExtraStringField"`

DoubleField float64 `json:"DoubleField"`

StringField string `json:"StringField"`

BoolFieldThatsActuallyString string `json:"BoolField"`

BytesField test.Bytes `json:"BytesField"`
}

type JSONDemoSchema struct {
IntField int32 `json:"IntField"`

Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/serde/protobuf/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) {
SchemaType: metadata.SchemaType,
References: metadata.References,
}
id, err := s.GetID(topic, protoMsg, info)
id, err := s.GetID(topic, protoMsg, &info)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions schemaregistry/serde/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,28 +128,28 @@ func TopicNameStrategy(topic string, serdeType Type, schema schemaregistry.Schem
}

// GetID returns a schema ID for the given schema
func (s *BaseSerializer) GetID(topic string, msg interface{}, info schemaregistry.SchemaInfo) (int, error) {
func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo) (int, error) {
autoRegister := s.Conf.AutoRegisterSchemas
useSchemaID := s.Conf.UseSchemaID
useLatest := s.Conf.UseLatestVersion
normalizeSchema := s.Conf.NormalizeSchemas

var id = -1
subject, err := s.SubjectNameStrategy(topic, s.SerdeType, info)
subject, err := s.SubjectNameStrategy(topic, s.SerdeType, *info)
if err != nil {
return -1, err
}
if autoRegister {
id, err = s.Client.Register(subject, info, normalizeSchema)
id, err = s.Client.Register(subject, *info, normalizeSchema)
if err != nil {
return -1, err
}
} else if useSchemaID >= 0 {
info, err = s.Client.GetBySubjectAndID(subject, useSchemaID)
*info, err = s.Client.GetBySubjectAndID(subject, useSchemaID)
if err != nil {
return -1, err
}
id, err = s.Client.GetID(subject, info, false)
id, err = s.Client.GetID(subject, *info, false)
if err != nil {
return -1, err
}
Expand All @@ -161,17 +161,17 @@ func (s *BaseSerializer) GetID(topic string, msg interface{}, info schemaregistr
if err != nil {
return -1, err
}
info = schemaregistry.SchemaInfo{
*info = schemaregistry.SchemaInfo{
Schema: metadata.Schema,
SchemaType: metadata.SchemaType,
References: metadata.References,
}
id, err = s.Client.GetID(subject, info, false)
id, err = s.Client.GetID(subject, *info, false)
if err != nil {
return -1, err
}
} else {
id, err = s.Client.GetID(subject, info, normalizeSchema)
id, err = s.Client.GetID(subject, *info, normalizeSchema)
if err != nil {
return -1, err
}
Expand Down

0 comments on commit fed30be

Please sign in to comment.