Skip to content

Commit

Permalink
broker: add DeregisterNode function (#103)
Browse files Browse the repository at this point in the history
* broker: add DeregisterNode function
  • Loading branch information
irenarindos authored Sep 15, 2023
1 parent a57f0a0 commit 58c7deb
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 19 deletions.
66 changes: 48 additions & 18 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ type NodeID string
// Accepted options: WithNodeRegistrationPolicy (default: AllowOverwrite).
func (b *Broker) RegisterNode(id NodeID, node Node, opt ...Option) error {
if id == "" {
return errors.New("unable to register node, node ID cannot be empty")
return fmt.Errorf("unable to register node, node ID cannot be empty: %w", ErrInvalidParameter)
}

opts, err := getOpts(opt...)
Expand Down Expand Up @@ -254,6 +254,50 @@ func (b *Broker) RegisterNode(id NodeID, node Node, opt ...Option) error {
return nil
}

// RemoveNode will remove a node from the broker, if it is not currently in use
// This is useful if RegisterNode was used successfully prior to a failed RegisterPipeline call
// referencing those nodes
func (b *Broker) RemoveNode(ctx context.Context, id NodeID) error {
b.lock.Lock()
defer b.lock.Unlock()
return b.removeNode(ctx, id, false)
}

// removeNode will remove a node from the broker, if it is not currently in use.
// This is useful if RegisterNode was used successfully prior to a failed RegisterPipeline call
// referencing those nodes
// The force option can be used to decrement the count for the node if it's still in use by pipelines
// This function assumes that the caller holds a lock
func (b *Broker) removeNode(ctx context.Context, id NodeID, force bool) error {
if id == "" {
return fmt.Errorf("unable to remove node, node ID cannot be empty: %w", ErrInvalidParameter)
}

nodeUsage, ok := b.nodes[id]
if !ok {
return fmt.Errorf("%w: %q", ErrNodeNotFound, id)
}

// if force is passed, then decrement the count for this node instead of failing
if nodeUsage.referenceCount > 0 && !force {
return fmt.Errorf("cannot remove node, as it is still in use by 1 or more pipelines: %q", id)
}

var err error
switch nodeUsage.referenceCount {
case 0, 1:
nc := NewNodeController(nodeUsage.node)
if err = nc.Close(ctx); err != nil {
err = fmt.Errorf("unable to close node ID %q: %w", id, err)
}
delete(b.nodes, id)
default:
nodeUsage.referenceCount--
}

return err
}

// PipelineID is a string that uniquely identifies a Pipeline within a given EventType.
type PipelineID string

Expand Down Expand Up @@ -400,23 +444,9 @@ func (b *Broker) RemovePipelineAndNodes(ctx context.Context, t EventType, id Pip
var nodeErr error

for _, nodeID := range nodes {
nodeUsage, ok := b.nodes[nodeID]
if !ok {
// We might get multiple nodes which cannot be found
nodeErr = multierror.Append(nodeErr, fmt.Errorf("node not found: %q", nodeID))
continue
}

switch nodeUsage.referenceCount {
case 0, 1:
nc := NewNodeController(nodeUsage.node)
if err := nc.Close(ctx); err != nil {
nodeErr = multierror.Append(nodeErr, fmt.Errorf("unable to close node ID %q: %w", nodeID, err))
}
// Node is not currently in use, or was only being used by this pipeline
delete(b.nodes, nodeID)
default:
nodeUsage.referenceCount--
err = b.removeNode(ctx, nodeID, true)
if err != nil {
nodeErr = multierror.Append(nodeErr, err)
}
}

Expand Down
55 changes: 54 additions & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func TestRegisterNode_NoID(t *testing.T) {
require.NoError(t, err)
err = b.RegisterNode("", &JSONFormatter{})
require.Error(t, err)
require.EqualError(t, err, "unable to register node, node ID cannot be empty")
require.EqualError(t, err, "unable to register node, node ID cannot be empty: invalid parameter")
}

// TestBroker_RegisterNode_AllowOverwrite_Implicit is used to prove that nodes can be
Expand Down Expand Up @@ -692,6 +692,59 @@ func TestBroker_RegisterNode_AllowThenDenyOverwrite(t *testing.T) {
require.Error(t, err)
}

// TestRemoveNode ensures we cannot remove a Node with an empty ID.
func TestRemoveNode(t *testing.T) {
b, err := NewBroker()
require.NoError(t, err)
err = b.RegisterNode("n1", &JSONFormatter{})
require.NoError(t, err)
err = b.RemoveNode(context.Background(), "n1")
require.NoError(t, err)
}

// TestRemoveNode_NoID ensures we cannot remove a Node with an empty ID.
func TestRemoveNode_NoID(t *testing.T) {
b, err := NewBroker()
require.NoError(t, err)
err = b.RemoveNode(context.Background(), "")
require.Error(t, err)
require.EqualError(t, err, "unable to remove node, node ID cannot be empty: invalid parameter")
}

// TestRemoveNode_NotFound ensures we cannot remove a Node that has not been registered
func TestRemoveNode_NotFound(t *testing.T) {
b, err := NewBroker()
require.NoError(t, err)
err = b.RemoveNode(context.Background(), "n1")
require.Error(t, err)
require.EqualError(t, err, "node not found: \"n1\"")
}

// TestRemoveNode_StillReferenced ensures we cannot remote a Node that is still referenced by a pipeline
func TestRemoveNode_StillReferenced(t *testing.T) {
b, err := NewBroker()
require.NoError(t, err)
err = b.RegisterNode("n1", &JSONFormatter{})
require.NoError(t, err)
b.nodes["n1"].referenceCount = 2
err = b.RemoveNode(context.Background(), "n1")
require.Error(t, err)
require.EqualError(t, err, "cannot remove node, as it is still in use by 1 or more pipelines: \"n1\"")
}

// TestDeregisterNode_Force ensures we can decrement the reference to a Node that is still referenced
// by a pipeline by using the force option
func TestRemoveNode_StillReferencedDecrement(t *testing.T) {
b, err := NewBroker()
require.NoError(t, err)
err = b.RegisterNode("n1", &JSONFormatter{})
require.NoError(t, err)
b.nodes["n1"].referenceCount = 2
err = b.removeNode(context.Background(), "n1", true)
require.NoError(t, err)
require.Equal(t, 1, b.nodes["n1"].referenceCount)
}

// TestBroker_RegisterPipeline_AllowOverwrite_Implicit is used to prove that pipelines can be
// overwritten when a Broker has been implicitly configured with the AllowOverwrite policy.
// This is the default in order to maintain pre-existing behavior.
Expand Down
1 change: 1 addition & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ import "errors"

var (
ErrInvalidParameter = errors.New("invalid parameter")
ErrNodeNotFound = errors.New("node not found")
)

0 comments on commit 58c7deb

Please sign in to comment.