Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proto schema registry refresh optimizations #1040

Merged
merged 6 commits into from
Feb 5, 2024

Conversation

bojand
Copy link
Member

@bojand bojand commented Jan 26, 2024

Iterative work for optimizing some of the proto schema registry refreshing.

Changes

  • In schema Service we not keep a map of proto schemas, and proto file descriptors indexed by schema ID.
  • As we refresh the schemas, we determine which ones are new or have been updated, and merge those into the index. We only parse and compile these updated / new ones.
  • We also ensure that the removed ones are removed from the mapping.
  • Changes the returned types of GetSchemasBySubject(), GetSchemas(), GetSchemasIndividually() to return pointer types.
  • gci seems to incorrectly sort the "maps" import. Updated the tooling hoping it would address the issue, but it didn't seem to help, so I commented that step out for now.

I will try and add some tests for this soon.

@weeco weeco self-requested a review January 26, 2024 23:24
Comment on lines 701 to 702
errors := make([]error, len(subjectsRes.Subjects))
hasErrors := atomic.Bool{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do errors := make([]error, 0, len(subjectsRes.Subjects)) and then we can check the length of errors to see if we have errors. I think in most cases we do not expect every single subject to return an error.

It feels a bit weird to return a slice of schemas and a slice of errors. At this point we can probably just refactor the function to return schema + error via a channel and handle the error and returned schema on the receiving side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to be able to get all the schemas we successfully fetched, even if we failed to retrieve some subset of all the schemas in SR. So we return all the successfully fetched schemas and a collection of errors.
This is the case for both GetSchemasIndividually() and GetSchemas(), and then GetProtoDescriptors() we are able to log all the failed schemas, but still be able to parse and compile the successfully retrieved ones so we could potentially use them.

s.requestGroup.Forget(key)
return nil, fmt.Errorf("failed to get schema from registry: %w", err)
_, err, _ := s.requestGroup.Do(key, func() (any, error) {
schemasRes, errs := s.registryClient.GetSchemas(ctx, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schemaRes will be nil as far as I can tell if all retrieved schemas errored (e.g. schema registry not reachable). I think this func will still work if this is the case, but would be good to test the case where schemaRes is nil.

Rather than using singleflight we could use https://github.com/twmb/go-cache to simplify some things, but I assume you tried to keep as much of the original code as possible

@bojand bojand merged commit 50874a7 into master Feb 5, 2024
8 checks passed
@bojand bojand deleted the backend/proto_refresh_optimizations branch February 5, 2024 20:03
@bojand bojand restored the backend/proto_refresh_optimizations branch April 12, 2024 00:25
@bojand bojand deleted the backend/proto_refresh_optimizations branch April 12, 2024 00:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants