Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(channel sink): add channel sink type #102

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions sinks/channel/channel_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package channel

import (
"context"
"errors"
"fmt"
"time"

"github.com/hashicorp/eventlogger"
)

// ChannelSink is a sink node which sends
// the event to a channel
type ChannelSink struct {
eventChan chan<- *eventlogger.Event

// The time to wait for a write before returning an error
timeoutDuration time.Duration
}

var _ eventlogger.Node = &ChannelSink{}

// newChannelSink creates a ChannelSink
// The time.Duration value is used to set a timeout on the consumer for sending events
// This is to account for consumers having different timeouts than senders
func NewChannelSink(c chan<- *eventlogger.Event, t time.Duration) (*ChannelSink, error) {
irenarindos marked this conversation as resolved.
Show resolved Hide resolved
if c == nil {
return nil, errors.New("missing event channel")
}
if t <= 0 {
return nil, errors.New("duration must be greater than 0")
}

return &ChannelSink{
eventChan: c,
timeoutDuration: t,
}, nil
}

// Process sends the event on a channel
// Process will wait for the ChannelSink timeoutDuration for a write before returning an error
// This is to account for consumers having different timeouts than senders
// Returns a nil event as this is a leaf node
func (c *ChannelSink) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
select {
case c.eventChan <- e:
case <-ctx.Done():
irenarindos marked this conversation as resolved.
Show resolved Hide resolved
return nil, ctx.Err()
case <-time.After(c.timeoutDuration):
return nil, fmt.Errorf("chan write timeout after %s", c.timeoutDuration)
}

return nil, nil
}

// Reopen is a no op
func (c *ChannelSink) Reopen() error {
return nil
}

// Type describes the type of the node as a NodeTypeSink.
func (c *ChannelSink) Type() eventlogger.NodeType {
return eventlogger.NodeTypeSink
}

// Name returns a representation of the ChannelSink's name
func (c *ChannelSink) Name() string {
return "ChannelSink"
}
112 changes: 112 additions & 0 deletions sinks/channel/channel_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package channel

import (
"context"
"testing"
"time"

"github.com/hashicorp/eventlogger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewChannelSink(t *testing.T) {
t.Parallel()

tests := []struct {
name string
c chan *eventlogger.Event
d time.Duration
wantErrContains string
}{
{
name: "missing-channel",
d: time.Second,
wantErrContains: "missing event channel",
},
{
name: "missing-duration",
c: make(chan *eventlogger.Event),
wantErrContains: "duration must be greater than 0",
},
{
name: "valid",
c: make(chan *eventlogger.Event),
d: time.Second,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := NewChannelSink(tt.c, tt.d)
if tt.wantErrContains != "" {
require.Error(err)
assert.Contains(err.Error(), tt.wantErrContains)
return
}
assert.NotNil(got)
})
}
}

func TestProcess(t *testing.T) {
t.Parallel()

timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Nanosecond)

tests := []struct {
name string
c chan *eventlogger.Event
ctx context.Context
d time.Duration
wantErrContains string
}{
{
name: "valid",
c: make(chan *eventlogger.Event, 1),
d: time.Second,
ctx: context.Background(),
},
{
name: "write timeout",
c: make(chan *eventlogger.Event, 0),
d: time.Second,
ctx: context.Background(),
wantErrContains: "chan write timeout",
},
{
name: "context timeout",
c: make(chan *eventlogger.Event, 0),
d: time.Second,
ctx: timeoutCtx,
wantErrContains: "context deadline exceeded",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sink, err := NewChannelSink(tt.c, tt.d)
require.Nil(err)
assert.NotNil(sink)
event := &eventlogger.Event{
Type: "testEvent",
CreatedAt: time.Time{},
Formatted: nil,
Payload: nil,
}
got, err := sink.Process(tt.ctx, event)
require.Nil(got)
if tt.wantErrContains != "" {
require.Error(err)
assert.Contains(err.Error(), tt.wantErrContains)
return
}
require.Nil(err)
})
}
}
5 changes: 5 additions & 0 deletions sinks/channel/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

// Package channel implements Sink which sends events to a channel.
package channel
74 changes: 74 additions & 0 deletions sinks/channel/docs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package channel_test

import (
"context"
"fmt"
"time"

"github.com/hashicorp/eventlogger"
"github.com/hashicorp/eventlogger/sinks/channel"
)

func ExampleChannelSink() {
then := time.Date(
2009, 11, 17, 20, 34, 58, 651387237, time.UTC)
// Create a broker
b, _ := eventlogger.NewBroker()

b.StopTimeAt(then) // setting this so the output timestamps are predictable for testing.

// Marshal to JSON
jsonFmt := &eventlogger.JSONFormatter{}

// Send the output to a channel
testChan := make(chan *eventlogger.Event, 1)
chanSink, err := channel.NewChannelSink(testChan, time.Second)
if err != nil {
// handle error
}

// Register the nodes with the broker
nodes := []eventlogger.Node{jsonFmt, chanSink}
nodeIDs := make([]eventlogger.NodeID, len(nodes))
for i, node := range nodes {
id := eventlogger.NodeID(fmt.Sprintf("node-%d", i))
err := b.RegisterNode(id, node)
if err != nil {
// handle error
}
nodeIDs[i] = id
}

et := eventlogger.EventType("test-event")
// Register a pipeline for our event type
err = b.RegisterPipeline(eventlogger.Pipeline{
EventType: et,
PipelineID: "writer-sink-pipeline",
NodeIDs: nodeIDs,
})
if err != nil {
// handle error
}

p := map[string]interface{}{
"name": "bob",
"role": "user",
"pronouns": []string{"they", "them"},
"coworkers": []string{"alice", "eve"},
}
// Send an event
if status, err := b.Send(context.Background(), et, p); err != nil {
// handle err and status.Warnings
fmt.Println("err: ", err)
fmt.Println("warnings: ", status.Warnings)
}

output := <-testChan
fmt.Println(string(output.Formatted["json"]))

// Output:
// {"created_at":"2009-11-17T20:34:58.651387237Z","event_type":"test-event","payload":{"coworkers":["alice","eve"],"name":"bob","pronouns":["they","them"],"role":"user"}}
}