Skip to content

Commit

Permalink
Minor refactor to use RuleRegistry (#22) (#1289)
Browse files Browse the repository at this point in the history
* Minor refactor to use RuleRegistry

* Minor cleanup

* Minor cleanup
  • Loading branch information
rayokota authored Sep 9, 2024
1 parent 3ee4497 commit f691087
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 75 deletions.
16 changes: 6 additions & 10 deletions schemaregistry/serde/avrov2/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ func NewSerializer(client schemaregistry.Client, serdeType serde.Type, conf *Ser
return s.FieldTransform(s.Client, ctx, fieldTransform, msg)
}
s.FieldTransformer = fieldTransformer
for _, rule := range serde.GetRuleExecutors() {
err = rule.Configure(client.Config(), conf.RuleConfig)
if err != nil {
return nil, err
}
err = s.SetRuleRegistry(serde.GlobalRuleRegistry(), conf.RuleConfig)
if err != nil {
return nil, err
}
return s, nil
}
Expand Down Expand Up @@ -146,11 +144,9 @@ func NewDeserializer(client schemaregistry.Client, serdeType serde.Type, conf *D
return s.FieldTransform(s.Client, ctx, fieldTransform, msg)
}
s.FieldTransformer = fieldTransformer
for _, rule := range serde.GetRuleExecutors() {
err = rule.Configure(client.Config(), conf.RuleConfig)
if err != nil {
return nil, err
}
err = s.SetRuleRegistry(serde.GlobalRuleRegistry(), conf.RuleConfig)
if err != nil {
return nil, err
}
return s, nil
}
Expand Down
16 changes: 6 additions & 10 deletions schemaregistry/serde/jsonschema/json_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,9 @@ func NewSerializer(client schemaregistry.Client, serdeType serde.Type, conf *Ser
return s.FieldTransform(s.Client, ctx, fieldTransform, msg)
}
s.FieldTransformer = fieldTransformer
for _, rule := range serde.GetRuleExecutors() {
err = rule.Configure(client.Config(), conf.RuleConfig)
if err != nil {
return nil, err
}
err = s.SetRuleRegistry(serde.GlobalRuleRegistry(), conf.RuleConfig)
if err != nil {
return nil, err
}
return s, nil
}
Expand Down Expand Up @@ -162,11 +160,9 @@ func NewDeserializer(client schemaregistry.Client, serdeType serde.Type, conf *D
return s.FieldTransform(s.Client, ctx, fieldTransform, msg)
}
s.FieldTransformer = fieldTransformer
for _, rule := range serde.GetRuleExecutors() {
err = rule.Configure(client.Config(), conf.RuleConfig)
if err != nil {
return nil, err
}
err = s.SetRuleRegistry(serde.GlobalRuleRegistry(), conf.RuleConfig)
if err != nil {
return nil, err
}
return s, nil
}
Expand Down
16 changes: 6 additions & 10 deletions schemaregistry/serde/protobuf/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,9 @@ func NewSerializer(client schemaregistry.Client, serdeType serde.Type, conf *Ser
if err != nil {
return nil, err
}
for _, rule := range serde.GetRuleExecutors() {
err = rule.Configure(client.Config(), conf.RuleConfig)
if err != nil {
return nil, err
}
err = s.SetRuleRegistry(serde.GlobalRuleRegistry(), conf.RuleConfig)
if err != nil {
return nil, err
}
return s, nil
}
Expand Down Expand Up @@ -498,11 +496,9 @@ func NewDeserializer(client schemaregistry.Client, serdeType serde.Type, conf *D
return s.FieldTransform(s.Client, ctx, fieldTransform, msg)
}
s.FieldTransformer = fieldTransformer
for _, rule := range serde.GetRuleExecutors() {
err = rule.Configure(client.Config(), conf.RuleConfig)
if err != nil {
return nil, err
}
err = s.SetRuleRegistry(serde.GlobalRuleRegistry(), conf.RuleConfig)
if err != nil {
return nil, err
}
return s, nil
}
Expand Down
109 changes: 66 additions & 43 deletions schemaregistry/serde/rule_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,97 @@ import (
)

var (
globalInstance = RuleRegistry{
ruleExecutors: make(map[string]RuleExecutor),
ruleActions: make(map[string]RuleAction),
}
)

// RuleRegistry is used to store all registered rule executors and actions.
type RuleRegistry struct {
ruleExecutorsMu sync.RWMutex
ruleExecutors = map[string]RuleExecutor{}
ruleExecutors map[string]RuleExecutor
ruleActionsMu sync.RWMutex
ruleActions = map[string]RuleAction{}
)
ruleActions map[string]RuleAction
}

// RegisterRuleExecutor is used to register a new rule executor.
func RegisterRuleExecutor(ruleExecutor RuleExecutor) {
ruleExecutorsMu.Lock()
defer ruleExecutorsMu.Unlock()
ruleExecutors[ruleExecutor.Type()] = ruleExecutor
// RegisterExecutor is used to register a new rule executor.
func (r *RuleRegistry) RegisterExecutor(ruleExecutor RuleExecutor) {
r.ruleExecutorsMu.Lock()
defer r.ruleExecutorsMu.Unlock()
r.ruleExecutors[ruleExecutor.Type()] = ruleExecutor
}

// GetRuleExecutor fetches a rule executor by a given name.
func GetRuleExecutor(name string) RuleExecutor {
ruleExecutorsMu.RLock()
defer ruleExecutorsMu.RUnlock()
return ruleExecutors[name]
// GetExecutor fetches a rule executor by a given name.
func (r *RuleRegistry) GetExecutor(name string) RuleExecutor {
r.ruleExecutorsMu.RLock()
defer r.ruleExecutorsMu.RUnlock()
return r.ruleExecutors[name]
}

// GetRuleExecutors fetches all rule executors
func GetRuleExecutors() []RuleExecutor {
ruleExecutorsMu.RLock()
defer ruleExecutorsMu.RUnlock()
// GetExecutors fetches all rule executors
func (r *RuleRegistry) GetExecutors() []RuleExecutor {
r.ruleExecutorsMu.RLock()
defer r.ruleExecutorsMu.RUnlock()
var result []RuleExecutor
for _, v := range ruleExecutors {
for _, v := range r.ruleExecutors {
result = append(result, v)
}
return result
}

// RegisterRuleAction is used to register a new rule action.
func RegisterRuleAction(ruleAction RuleAction) {
ruleActionsMu.Lock()
defer ruleActionsMu.Unlock()
ruleActions[ruleAction.Type()] = ruleAction
// RegisterAction is used to register a new global rule action.
func (r *RuleRegistry) RegisterAction(ruleAction RuleAction) {
r.ruleActionsMu.Lock()
defer r.ruleActionsMu.Unlock()
r.ruleActions[ruleAction.Type()] = ruleAction
}

// GetRuleAction fetches a rule action by a given name.
func GetRuleAction(name string) RuleAction {
ruleActionsMu.RLock()
defer ruleActionsMu.RUnlock()
return ruleActions[name]
// GetAction fetches a rule action by a given name.
func (r *RuleRegistry) GetAction(name string) RuleAction {
r.ruleActionsMu.RLock()
defer r.ruleActionsMu.RUnlock()
return r.ruleActions[name]
}

// GetRuleActions fetches all rule actions
func GetRuleActions() []RuleAction {
ruleActionsMu.RLock()
defer ruleActionsMu.RUnlock()
// GetActions fetches all rule actions
func (r *RuleRegistry) GetActions() []RuleAction {
r.ruleActionsMu.RLock()
defer r.ruleActionsMu.RUnlock()
var result []RuleAction
for _, v := range ruleActions {
for _, v := range r.ruleActions {
result = append(result, v)
}
return result
}

// ClearRules clears all registered rules
func ClearRules() {
ruleActionsMu.Lock()
defer ruleActionsMu.Unlock()
for k, v := range ruleActions {
// Clear clears all registered rules
func (r *RuleRegistry) Clear() {
r.ruleActionsMu.Lock()
defer r.ruleActionsMu.Unlock()
for k, v := range r.ruleActions {
_ = v.Close()
delete(ruleActions, k)
delete(r.ruleActions, k)
}
ruleExecutorsMu.Lock()
defer ruleExecutorsMu.Unlock()
for k, v := range ruleExecutors {
r.ruleExecutorsMu.Lock()
defer r.ruleExecutorsMu.Unlock()
for k, v := range r.ruleExecutors {
_ = v.Close()
delete(ruleExecutors, k)
delete(r.ruleExecutors, k)
}
}

// GlobalRuleRegistry returns the global rule registry.
func GlobalRuleRegistry() *RuleRegistry {
return &globalInstance
}

// RegisterRuleExecutor is used to register a new global rule executor.
func RegisterRuleExecutor(ruleExecutor RuleExecutor) {
globalInstance.RegisterExecutor(ruleExecutor)
}

// RegisterRuleAction is used to register a new global rule action.
func RegisterRuleAction(ruleAction RuleAction) {
globalInstance.RegisterAction(ruleAction)
}
17 changes: 15 additions & 2 deletions schemaregistry/serde/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Serde struct {
SubjectNameStrategy SubjectNameStrategyFunc
MessageFactory MessageFactory
FieldTransformer FieldTransformer
RuleRegistry *RuleRegistry
}

// BaseSerializer represents basic serializer info
Expand Down Expand Up @@ -462,6 +463,18 @@ func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregist
return id, nil
}

// SetRuleRegistry sets the rule registry
func (s *Serde) SetRuleRegistry(registry *RuleRegistry, ruleConfig map[string]string) error {
s.RuleRegistry = registry
for _, rule := range registry.GetExecutors() {
err := rule.Configure(s.Client.Config(), ruleConfig)
if err != nil {
return err
}
}
return nil
}

// GetMigrations returns the migration rules for the given subject
func (s *Serde) GetMigrations(subject string, topic string, sourceInfo *schemaregistry.SchemaInfo,
target *schemaregistry.SchemaMetadata, msg interface{}) ([]Migration, error) {
Expand Down Expand Up @@ -625,7 +638,7 @@ func (s *Serde) ExecuteRules(subject string, topic string, ruleMode schemaregist
Rules: rules,
FieldTransformer: s.FieldTransformer,
}
ruleExecutor := GetRuleExecutor(rule.Type)
ruleExecutor := s.RuleRegistry.GetExecutor(rule.Type)
if ruleExecutor == nil {
err := s.runAction(ctx, ruleMode, rule, rule.OnFailure, msg,
fmt.Errorf("could not find rule executor of type %s", rule.Type), "ERROR")
Expand Down Expand Up @@ -721,7 +734,7 @@ func (s *Serde) getRuleAction(_ RuleContext, actionName string) RuleAction {
} else if actionName == "NONE" {
return NoneAction{}
} else {
return GetRuleAction(actionName)
return s.RuleRegistry.GetAction(actionName)
}
}

Expand Down

0 comments on commit f691087

Please sign in to comment.