Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Schema Registry support #776

Merged
merged 41 commits into from
Jul 1, 2022
Merged

Conversation

rayokota
Copy link
Member

@rayokota rayokota commented May 4, 2022

This PR adds support for Schema Registry to the golang client. Much of the low-level SR client code has been adapted from #231. There is a new Mock SR client that has been added. All the serdes are new. The following serdes are supported:

Both unit tests and runnable examples have been added. All the runnable examples have been tested locally.

value := MyRecord{
ProductName: "Hello!",
}
payload, err := ser.Serialize(topic, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting - you've decoupled the serdes from the produce call. Not how it's done elsewhere, but I feel this approach is ok. The primary downside I can think of is that you don't get the original record in the context of the delivery report (just the serialized payload). The upside is it's much easier to implement.

Copy link
Contributor

@mhowlett mhowlett May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess not having generics forces you to do this - ?

some more disadvantages:

  • Explicit serialization/deserialization is more key-strokes for the user.
  • When you instantiate the serializer/deserializer the user needs to specify key or value. They then implicitly specify this again when passing the payload into the produce call, that's superfluous, seems not ideal.
  • The serialize call is potentially blocking due to the HTTP call, whereas the produce call isn't (or shouldn't be IMO, i'll need to check how that's setup in the golang client case). I think you ideally want the entire produce operation to be async, and to achieve that the user would need to write some additional code. This may not matter in the goroutine paradigm though (as compared to async/await where you typically never want to block a thread).

@rayokota
Copy link
Member Author

rayokota commented May 5, 2022

Some design notes for reviewers:

This PR differs from #231 in that the serdes have not been fully integrated with the producer/consumer. That is still possible, but I wanted to see if it made sense before I proceeded with that additional change. If we want that additional change, I can make it in this PR or in a subsequent one.

There is one benefit of not fully integrating the serdes with the producer/consumer, and that is it will make it simpler to specify the object to hold the deserialized result if the deserializers are used outside the producer/consumer. Right now the deserializers look like

// Deserializer represents a deserializer
type Deserializer interface {
	Configure(conf *kafka.ConfigMap, isKey bool) error
	// Deserialize will call the MessageFactory to create an object
	// into which we will unmarshal data.
	Deserialize(topic string, payload []byte) (interface{}, error)
	// DeserializeInto will unmarshal data into the given object.
	DeserializeInto(topic string, payload []byte, msg interface{}) error
	MessageFactory() MessageFactory
	SetMessageFactory(factory MessageFactory)
	Close()
}

There are two ways to use the deserializer, one is to call DeserializeInto and pass the object to hold the deserialized result. The other is to use Deserialize, where the deserializer should be set up with a MessageFactory that is essentially a callback that will ask for the object to hold the deserialized result.

For Protobuf, there is a natural MessageFactory built into the libraries that can achieve this, so Deserialize is more natural with Protobuf. For Avro/JSON Schema, there is no such factory, so a MessageFactory needs to be specified manually before using Deserialize, or instead one can just use DeserializeInto.

If we choose not to integrate the serdes with the producer/consumer, then the interfaces can be changed, perhaps to be more idiomatic. As @mhowlett has noticed, we could replace

        ser := schemaregistry.SpecificAvroSerializer{}
	err = ser.Configure(&kafka.ConfigMap{
		"schema.registry.url":   url, 
        }, false)

with

        ser := schemaregistry.NewSpecificAvroSerializer{&kafka.ConfigMap{
                 "schema.registry.url": url,
        }, false)

or get rid of the ConfigMap and just a regular map. The ConfigMap is really there in case we do want to integrate the serdes with the producer/consumer in the future.

@mhowlett @edenhill Thoughts?

schemaregistry/serde.go Outdated Show resolved Hide resolved
schemaregistry/serde.go Outdated Show resolved Hide resolved
schemaregistry/serde.go Outdated Show resolved Hide resolved
schemaregistry/serde.go Outdated Show resolved Hide resolved
@robloxtq
Copy link

How does this handle .proto definitions with imports that are import "foo/bar/baz.proto"?
As far as I can tell, the resolution that happens here cannot handle that as the subjects are not path escaped.

@rayokota
Copy link
Member Author

How does this handle .proto definitions with imports that are import "foo/bar/baz.proto"? As far as I can tell, the resolution that happens here cannot handle that as the subjects are not path escaped.

@tquachbot , good point, I'll take a look.

schemaregistry/schemaregistry_client.go Outdated Show resolved Hide resolved
schemaregistry/schemaregistry_client.go Outdated Show resolved Hide resolved
schemaregistry/config.go Outdated Show resolved Hide resolved
Copy link
Contributor

@mhowlett mhowlett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last round of minor comments,
with that LGTM.

Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks Robert!


// handleRequest sends a HTTP(S) request to the Schema Registry, placing results into the response object
func (rs *restService) handleRequest(request *api, response interface{}) error {
endpoint, err := rs.url.Parse(fmt.Sprintf(base+request.endpoint, request.arguments...))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @emasab, @edenhill,

Curious why is this done? This seems to break for schema registry servers that are located at a specific path, not just a hostname. For example if my schema registry endpoint is at "https://foo.bar/baz", and I want to call the GET "subjects" API, then this will send the request to host https://foo.bar with path "/subjects" instead of "/baz/subjects".

Was this an attempt to trim the potentially-trailing slash? I.e. to change "https://foo.bar/baz/" to "https://foo.bar/baz"?

}
}

func ignoreFile(name string) bool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @emasab

Curious why are these ignored?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants