Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proto schema registry refresh optimizations #1040

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,19 @@ func (c *Client) GetSchemaBySubject(ctx context.Context, subject, version string
}

// GetSchemasBySubject gets all versioned schemas for a given subject.
func (c *Client) GetSchemasBySubject(ctx context.Context, subject string, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
func (c *Client) GetSchemasBySubject(ctx context.Context, subject string, showSoftDeleted bool) ([]*SchemaVersionedResponse, error) {
versionRes, err := c.GetSubjectVersions(ctx, subject, showSoftDeleted)
if err != nil {
return nil, err
}

results := []SchemaVersionedResponse{}
results := []*SchemaVersionedResponse{}
for _, sv := range versionRes.Versions {
sr, err := c.GetSchemaBySubject(ctx, subject, strconv.FormatInt(int64(sv), 10), showSoftDeleted)
if err != nil {
return nil, fmt.Errorf("failed to get schema by subject %s and version %d", subject, sv)
}
results = append(results, *sr)
results = append(results, sr)
}

return results, nil
Expand Down Expand Up @@ -595,8 +595,8 @@ func (c *Client) GetSchemaTypes(ctx context.Context) ([]string, error) {
}

// GetSchemas retrieves all stored schemas from a schema registry.
func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
var schemas []SchemaVersionedResponse
func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]*SchemaVersionedResponse, []error) {
var schemas []*SchemaVersionedResponse
req := c.client.R().
SetContext(ctx).
SetResult(&schemas)
Expand All @@ -607,7 +607,7 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema

res, err := req.Get("/schemas")
if err != nil {
return nil, fmt.Errorf("get schemas failed: %w", err)
return nil, []error{fmt.Errorf("get schemas failed: %w", err)}
}

if res.StatusCode() == http.StatusNotFound {
Expand All @@ -619,9 +619,9 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema
if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("get schemas failed: Status code %d", res.StatusCode())
return nil, []error{fmt.Errorf("get schemas failed: Status code %d", res.StatusCode())}
}
return nil, restErr
return nil, []error{restErr}
}

return schemas, nil
Expand Down Expand Up @@ -687,38 +687,44 @@ func (c *Client) CreateSchema(ctx context.Context, subjectName string, schema Sc

// GetSchemasIndividually returns all schemas by describing all schemas one by one. This may be used against
// schema registry that don't support the /schemas endpoint that returns a list of all registered schemas.
func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]*SchemaVersionedResponse, []error) {
subjectsRes, err := c.GetSubjects(ctx, showSoftDeleted)
if err != nil {
return nil, fmt.Errorf("failed to get subjects to fetch schemas for: %w", err)
return nil, []error{fmt.Errorf("failed to get subjects to fetch schemas for: %w", err)}
}

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // limit max concurrency to 10 requests at a time

schemas := make([]SchemaVersionedResponse, 0, len(subjectsRes.Subjects))
schemas := make([]*SchemaVersionedResponse, 0, len(subjectsRes.Subjects))
errors := make([]error, 0, len(subjectsRes.Subjects))
mutex := sync.Mutex{}

for _, subject := range subjectsRes.Subjects {
subject := subject

g.Go(func() error {
srRes, err := c.GetSchemasBySubject(ctx, subject, showSoftDeleted)
if err != nil {
return err
mutex.Lock()
errors = append(errors, err)
mutex.Unlock()
return nil
}

mutex.Lock()
schemas = append(schemas, srRes...)
mutex.Unlock()
return err
return nil
})
}

// is this even possible?
if err := g.Wait(); err != nil {
return nil, err
return nil, []error{err}
}

return schemas, nil
return schemas, errors
}

// GetSchemaReferencesResponse is the response to fetching schema references.
Expand Down
118 changes: 98 additions & 20 deletions backend/pkg/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package schema
import (
"context"
"fmt"
"maps"
"strconv"
"strings"
"sync"
"time"

"github.com/hamba/avro/v2"
Expand Down Expand Up @@ -41,6 +43,11 @@ type Service struct {
// by subjects is needed to lookup references in avro schemas.
schemaBySubjectVersion *cache.Cache[string, *SchemaVersionedResponse]
avroSchemaByID *cache.Cache[uint32, avro.Schema]

// for protobuf schema refreshing and compiling
srRefreshMutex sync.RWMutex
protoSchemasByID map[int]*SchemaVersionedResponse
protoFDByID map[int]*desc.FileDescriptor
}

// NewService to access schema registry. Returns an error if connection can't be established.
Expand All @@ -67,35 +74,73 @@ func (s *Service) CheckConnectivity(ctx context.Context) error {

// GetProtoDescriptors returns all file descriptors in a map where the key is the schema id.
// The value is a set of file descriptors because each schema may references / imported proto schemas.
//
//nolint:gocognit,cyclop // complicated refresh logic
func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDescriptor, error) {
// Singleflight makes sure to not run the function body if there are concurrent requests. We use this to avoid
// duplicate requests against the schema registry
key := "get-proto-descriptors"
v, err, _ := s.requestGroup.Do(key, func() (any, error) {
schemasRes, err := s.registryClient.GetSchemas(ctx, false)
if err != nil {
// If schema registry returns an error we want to retry it next time, so let's forget the key
s.requestGroup.Forget(key)
return nil, fmt.Errorf("failed to get schema from registry: %w", err)
_, err, _ := s.requestGroup.Do(key, func() (any, error) {
schemasRes, errs := s.registryClient.GetSchemas(ctx, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schemaRes will be nil as far as I can tell if all retrieved schemas errored (e.g. schema registry not reachable). I think this func will still work if this is the case, but would be good to test the case where schemaRes is nil.

Rather than using singleflight we could use https://github.com/twmb/go-cache to simplify some things, but I assume you tried to keep as much of the original code as possible

if len(errs) > 0 {
for _, err := range errs {
s.logger.Error("failed to get schema from registry", zap.Error(err))
}

if len(schemasRes) == 0 {
return nil, nil
}
}

s.srRefreshMutex.Lock()
defer s.srRefreshMutex.Unlock()

if s.protoSchemasByID == nil {
s.protoSchemasByID = make(map[int]*SchemaVersionedResponse, len(schemasRes))
}

// collect existing schema IDs
existingSchemaIDs := make(map[int]struct{}, len(s.protoSchemasByID))
for id := range s.protoSchemasByID {
existingSchemaIDs[id] = struct{}{}
}

// 1. Index all returned schemas by their respective subject name and version as stored in the schema registry
schemasBySubjectAndVersion := make(map[string]map[int]SchemaVersionedResponse)
schemasToCompile := make([]*SchemaVersionedResponse, 0, len(schemasRes))

newSchemaIDs := make(map[int]struct{}, len(schemasRes))

// Index all returned schemas by their respective subject name and version as stored in the schema registry
// Collect the new or updated schemas to compile
schemasBySubjectAndVersion := make(map[string]map[int]*SchemaVersionedResponse)
for _, schema := range schemasRes {
schema := schema

if schema.Type != TypeProtobuf {
continue
}
_, exists := schemasBySubjectAndVersion[schema.Subject]
if !exists {
schemasBySubjectAndVersion[schema.Subject] = make(map[int]SchemaVersionedResponse)
schemasBySubjectAndVersion[schema.Subject] = make(map[int]*SchemaVersionedResponse)
}
schemasBySubjectAndVersion[schema.Subject][schema.Version] = schema

if existing, ok := s.protoSchemasByID[schema.SchemaID]; !ok || !strings.EqualFold(existing.Schema, schema.Schema) {
schemasToCompile = append(schemasToCompile, schema)
}

newSchemaIDs[schema.SchemaID] = struct{}{}

s.protoSchemasByID[schema.SchemaID] = schema
}

// 2. Compile each subject with each of it's references into one or more filedescriptors so that they can be
compileStart := time.Now()

// 2. Compile each subject with each of it's references into one or more file descriptors so that they can be
// registered in their own proto registry.
fdBySchemaID := make(map[int]*desc.FileDescriptor)
for _, schema := range schemasRes {
newFDBySchemaID := make(map[int]*desc.FileDescriptor, len(schemasToCompile))
for _, schema := range schemasToCompile {
schema := schema

if schema.Type != TypeProtobuf {
continue
}
Expand All @@ -108,24 +153,57 @@ func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDe
zap.Error(err))
continue
}
fdBySchemaID[schema.SchemaID] = fd
newFDBySchemaID[schema.SchemaID] = fd
}

compileDuration := time.Since(compileStart)

// merge
if s.protoFDByID == nil {
s.protoFDByID = make(map[int]*desc.FileDescriptor, len(newFDBySchemaID))
}

maps.Copy(s.protoFDByID, newFDBySchemaID)

schemasDeleted := 0

// remove schemas only if no errors
if len(errs) == 0 {
schemasIDsToDelete := make([]int, 0, len(s.protoSchemasByID)/2)

for id := range existingSchemaIDs {
if _, ok := newSchemaIDs[id]; !ok {
schemasIDsToDelete = append(schemasIDsToDelete, id)
}
}

for _, id := range schemasIDsToDelete {
delete(s.protoSchemasByID, id)
delete(s.protoFDByID, id)
}

schemasDeleted = len(schemasIDsToDelete)
}

return fdBySchemaID, nil
s.logger.Info("compiled new schemas",
zap.Int("updated_schemas", len(schemasToCompile)),
zap.Int("deleted_schemas", schemasDeleted),
zap.Duration("compile_duration", compileDuration))

return nil, nil
})
if err != nil {
return nil, err
}

descriptors, ok := v.(map[int]*desc.FileDescriptor)
if !ok {
return nil, fmt.Errorf("failed to type assert file descriptors")
}
s.srRefreshMutex.RLock()
descriptors := maps.Clone(s.protoFDByID)
s.srRefreshMutex.RUnlock()

return descriptors, nil
}

func (s *Service) addReferences(schema SchemaVersionedResponse, schemaRepository map[string]map[int]SchemaVersionedResponse, schemasByPath map[string]string) error {
func (s *Service) addReferences(schema *SchemaVersionedResponse, schemaRepository map[string]map[int]*SchemaVersionedResponse, schemasByPath map[string]string) error {
for _, ref := range schema.References {
refSubject, exists := schemaRepository[ref.Subject]
if !exists {
Expand All @@ -147,7 +225,7 @@ func (s *Service) addReferences(schema SchemaVersionedResponse, schemaRepository
return nil
}

func (s *Service) compileProtoSchemas(schema SchemaVersionedResponse, schemaRepository map[string]map[int]SchemaVersionedResponse) (*desc.FileDescriptor, error) {
func (s *Service) compileProtoSchemas(schema *SchemaVersionedResponse, schemaRepository map[string]map[int]*SchemaVersionedResponse) (*desc.FileDescriptor, error) {
// 1. Let's find the references for each schema and put the references' schemas into our in memory filesystem.
schemasByPath := make(map[string]string)
schemasByPath[schema.Subject] = schema.Schema
Expand Down
8 changes: 5 additions & 3 deletions taskfiles/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tasks:

install-gofumpt:
vars:
GOFUMPT_VERSION: 0.4.0
GOFUMPT_VERSION: 0.5.0
desc: install gofumpt go formater
deps:
- install-go
Expand All @@ -50,7 +50,7 @@ tasks:

install-gci:
vars:
GCI_VERSION: 0.9.0
GCI_VERSION: 0.12.1
desc: install gci
deps:
- install-go
Expand Down Expand Up @@ -108,7 +108,9 @@ tasks:
cmds:
- '{{ .BUILD_ROOT }}/bin/go/goimports -l -w -local "github.com/redpanda-data/console/backend" .'
- '{{ .BUILD_ROOT }}/bin/go/gofumpt -l -w .'
- '{{ .BUILD_ROOT }}/bin/go/gci write -s standard -s default -s ''Prefix(github.com/redpanda-data/console/backend)'' .'
# https://github.com/daixiang0/gci/issues/174
# moves "maps" into the wrong place
# - '{{ .BUILD_ROOT }}/bin/go/gci write -s default -s standard -s ''prefix(github.com/redpanda-data/console/backend)'' .'

lint:
desc: Run Go linters for backend code
Expand Down
Loading