From bd01b6fd8bd60589e91ad1fe1f7ff92c485c61ae Mon Sep 17 00:00:00 2001 From: irenarindos Date: Thu, 7 Sep 2023 15:44:52 -0400 Subject: [PATCH 1/6] feat(channel sink): add channel sink type --- sinks/channel/channel_sink.go | 59 ++++++++++++++++++++++++ sinks/channel/channel_sink_test.go | 44 ++++++++++++++++++ sinks/channel/docs.go | 5 ++ sinks/channel/docs_test.go | 74 ++++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 sinks/channel/channel_sink.go create mode 100644 sinks/channel/channel_sink_test.go create mode 100644 sinks/channel/docs.go create mode 100644 sinks/channel/docs_test.go diff --git a/sinks/channel/channel_sink.go b/sinks/channel/channel_sink.go new file mode 100644 index 0000000..87287ee --- /dev/null +++ b/sinks/channel/channel_sink.go @@ -0,0 +1,59 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package channel + +import ( + "context" + "errors" + "sync" + + "github.com/hashicorp/eventlogger" +) + +// ChannelSink is a sink node which sends +// the event to a channel +type ChannelSink struct { + mu sync.Mutex + + eventChan chan *eventlogger.Event +} + +var _ eventlogger.Node = &ChannelSink{} + +// newChannelSink creates a ChannelSink +func NewChannelSink(c chan *eventlogger.Event) (*ChannelSink, error) { + if c == nil { + return nil, errors.New("missing event channel") + } + + return &ChannelSink{ + eventChan: c, + }, nil +} + +// Process sends the event on a channel +// Returns a nil event as this is a leaf node +func (c *ChannelSink) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.eventChan <- e + + return nil, nil +} + +// Reopen is a no op +func (c *ChannelSink) Reopen() error { + return nil +} + +// Type describes the type of the node as a NodeTypeSink. +func (c *ChannelSink) Type() eventlogger.NodeType { + return eventlogger.NodeTypeSink +} + +// Name returns a representation of the ChannelSink's name +func (c *ChannelSink) Name() string { + return "ChannelSink" +} diff --git a/sinks/channel/channel_sink_test.go b/sinks/channel/channel_sink_test.go new file mode 100644 index 0000000..cb40eaa --- /dev/null +++ b/sinks/channel/channel_sink_test.go @@ -0,0 +1,44 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package channel + +import ( + "testing" + + "github.com/hashicorp/eventlogger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewChannelSink(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + c chan *eventlogger.Event + wantErrContains string + }{ + { + name: "missing-channel", + wantErrContains: "missing event channel", + }, + { + name: "valid", + c: make(chan *eventlogger.Event), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + got, err := NewChannelSink(tt.c) + if tt.wantErrContains != "" { + require.Error(err) + assert.Contains(err.Error(), tt.wantErrContains) + return + } + assert.NotNil(got) + }) + } +} diff --git a/sinks/channel/docs.go b/sinks/channel/docs.go new file mode 100644 index 0000000..2f1e0be --- /dev/null +++ b/sinks/channel/docs.go @@ -0,0 +1,5 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +// Package channel implements Sink which sends events to a channel. +package channel diff --git a/sinks/channel/docs_test.go b/sinks/channel/docs_test.go new file mode 100644 index 0000000..3052f23 --- /dev/null +++ b/sinks/channel/docs_test.go @@ -0,0 +1,74 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package channel_test + +import ( + "context" + "fmt" + "time" + + "github.com/hashicorp/eventlogger" + "github.com/hashicorp/eventlogger/sinks/channel" +) + +func ExampleChannelSink() { + then := time.Date( + 2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + // Create a broker + b, _ := eventlogger.NewBroker() + + b.StopTimeAt(then) // setting this so the output timestamps are predictable for testing. + + // Marshal to JSON + jsonFmt := &eventlogger.JSONFormatter{} + + // Send the output to a channel + testChan := make(chan *eventlogger.Event, 1) + chanSink, err := channel.NewChannelSink(testChan) + if err != nil { + // handle error + } + + // Register the nodes with the broker + nodes := []eventlogger.Node{jsonFmt, chanSink} + nodeIDs := make([]eventlogger.NodeID, len(nodes)) + for i, node := range nodes { + id := eventlogger.NodeID(fmt.Sprintf("node-%d", i)) + err := b.RegisterNode(id, node) + if err != nil { + // handle error + } + nodeIDs[i] = id + } + + et := eventlogger.EventType("test-event") + // Register a pipeline for our event type + err = b.RegisterPipeline(eventlogger.Pipeline{ + EventType: et, + PipelineID: "writer-sink-pipeline", + NodeIDs: nodeIDs, + }) + if err != nil { + // handle error + } + + p := map[string]interface{}{ + "name": "bob", + "role": "user", + "pronouns": []string{"they", "them"}, + "coworkers": []string{"alice", "eve"}, + } + // Send an event + if status, err := b.Send(context.Background(), et, p); err != nil { + // handle err and status.Warnings + fmt.Println("err: ", err) + fmt.Println("warnings: ", status.Warnings) + } + + output := <-testChan + fmt.Println(string(output.Formatted["json"])) + + // Output: + // {"created_at":"2009-11-17T20:34:58.651387237Z","event_type":"test-event","payload":{"coworkers":["alice","eve"],"name":"bob","pronouns":["they","them"],"role":"user"}} +} From 0faa8ae4e93ed6d36cce6a521eb5584e8704d3ac Mon Sep 17 00:00:00 2001 From: irenarindos Date: Mon, 11 Sep 2023 09:03:01 -0400 Subject: [PATCH 2/6] fixup! feat(channel sink): add channel sink type --- sinks/channel/channel_sink.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/sinks/channel/channel_sink.go b/sinks/channel/channel_sink.go index 87287ee..945a6e9 100644 --- a/sinks/channel/channel_sink.go +++ b/sinks/channel/channel_sink.go @@ -6,7 +6,8 @@ package channel import ( "context" "errors" - "sync" + "fmt" + "time" "github.com/hashicorp/eventlogger" ) @@ -14,31 +15,40 @@ import ( // ChannelSink is a sink node which sends // the event to a channel type ChannelSink struct { - mu sync.Mutex + eventChan chan<- *eventlogger.Event - eventChan chan *eventlogger.Event + // The time to wait for a write before returning an error + timeoutDuration time.Duration } var _ eventlogger.Node = &ChannelSink{} // newChannelSink creates a ChannelSink -func NewChannelSink(c chan *eventlogger.Event) (*ChannelSink, error) { +func NewChannelSink(c chan<- *eventlogger.Event, t time.Duration) (*ChannelSink, error) { if c == nil { return nil, errors.New("missing event channel") } + if t <= 0 { + return nil, errors.New("duration must be greater than 0") + } return &ChannelSink{ - eventChan: c, + eventChan: c, + timeoutDuration: t, }, nil } // Process sends the event on a channel +// Process will wait for the ChannelSink timeoutDuration for a write before returning an error // Returns a nil event as this is a leaf node func (c *ChannelSink) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.eventChan <- e + select { + case c.eventChan <- e: + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(c.timeoutDuration): + return nil, fmt.Errorf("%s: chan write timeout") + } return nil, nil } From 56fd360a22bc3df66b5edb611ff65a4747cc9bbc Mon Sep 17 00:00:00 2001 From: irenarindos Date: Mon, 11 Sep 2023 10:44:23 -0400 Subject: [PATCH 3/6] fixup! feat(channel sink): add channel sink type --- sinks/channel/channel_sink.go | 2 +- sinks/channel/channel_sink_test.go | 11 ++++++++++- sinks/channel/docs_test.go | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sinks/channel/channel_sink.go b/sinks/channel/channel_sink.go index 945a6e9..dd54bc5 100644 --- a/sinks/channel/channel_sink.go +++ b/sinks/channel/channel_sink.go @@ -47,7 +47,7 @@ func (c *ChannelSink) Process(ctx context.Context, e *eventlogger.Event) (*event case <-ctx.Done(): return nil, ctx.Err() case <-time.After(c.timeoutDuration): - return nil, fmt.Errorf("%s: chan write timeout") + return nil, fmt.Errorf("chan write timeout after %s", c.timeoutDuration) } return nil, nil diff --git a/sinks/channel/channel_sink_test.go b/sinks/channel/channel_sink_test.go index cb40eaa..62f8750 100644 --- a/sinks/channel/channel_sink_test.go +++ b/sinks/channel/channel_sink_test.go @@ -5,6 +5,7 @@ package channel import ( "testing" + "time" "github.com/hashicorp/eventlogger" "github.com/stretchr/testify/assert" @@ -17,22 +18,30 @@ func TestNewChannelSink(t *testing.T) { tests := []struct { name string c chan *eventlogger.Event + d time.Duration wantErrContains string }{ { name: "missing-channel", + d: time.Second, wantErrContains: "missing event channel", }, + { + name: "missing-duration", + c: make(chan *eventlogger.Event), + wantErrContains: "duration must be greater than 0", + }, { name: "valid", c: make(chan *eventlogger.Event), + d: time.Second, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { assert, require := assert.New(t), require.New(t) - got, err := NewChannelSink(tt.c) + got, err := NewChannelSink(tt.c, tt.d) if tt.wantErrContains != "" { require.Error(err) assert.Contains(err.Error(), tt.wantErrContains) diff --git a/sinks/channel/docs_test.go b/sinks/channel/docs_test.go index 3052f23..c4d59c3 100644 --- a/sinks/channel/docs_test.go +++ b/sinks/channel/docs_test.go @@ -25,7 +25,7 @@ func ExampleChannelSink() { // Send the output to a channel testChan := make(chan *eventlogger.Event, 1) - chanSink, err := channel.NewChannelSink(testChan) + chanSink, err := channel.NewChannelSink(testChan, time.Second) if err != nil { // handle error } From 1f3a1316f768df9b24eefb3bafd6557c95e61c73 Mon Sep 17 00:00:00 2001 From: irenarindos Date: Mon, 11 Sep 2023 14:52:33 -0400 Subject: [PATCH 4/6] fixup! feat(channel sink): add channel sink type --- sinks/channel/channel_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sinks/channel/channel_sink.go b/sinks/channel/channel_sink.go index dd54bc5..f8bfaa5 100644 --- a/sinks/channel/channel_sink.go +++ b/sinks/channel/channel_sink.go @@ -40,6 +40,7 @@ func NewChannelSink(c chan<- *eventlogger.Event, t time.Duration) (*ChannelSink, // Process sends the event on a channel // Process will wait for the ChannelSink timeoutDuration for a write before returning an error +// This is to account for consumers having different timeouts than senders // Returns a nil event as this is a leaf node func (c *ChannelSink) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { select { From 63f6d596e77a76abbf0e48f5adfd3fa323515db9 Mon Sep 17 00:00:00 2001 From: irenarindos Date: Mon, 11 Sep 2023 15:31:51 -0400 Subject: [PATCH 5/6] fixup! feat(channel sink): add channel sink type --- sinks/channel/channel_sink.go | 2 + sinks/channel/channel_sink_test.go | 59 ++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/sinks/channel/channel_sink.go b/sinks/channel/channel_sink.go index f8bfaa5..f7bf98a 100644 --- a/sinks/channel/channel_sink.go +++ b/sinks/channel/channel_sink.go @@ -24,6 +24,8 @@ type ChannelSink struct { var _ eventlogger.Node = &ChannelSink{} // newChannelSink creates a ChannelSink +// The time.Duration value is used to set a timeout on the consumer for sending events +// This is to account for consumers having different timeouts than senders func NewChannelSink(c chan<- *eventlogger.Event, t time.Duration) (*ChannelSink, error) { if c == nil { return nil, errors.New("missing event channel") diff --git a/sinks/channel/channel_sink_test.go b/sinks/channel/channel_sink_test.go index 62f8750..e01fd3a 100644 --- a/sinks/channel/channel_sink_test.go +++ b/sinks/channel/channel_sink_test.go @@ -4,6 +4,7 @@ package channel import ( + "context" "testing" "time" @@ -51,3 +52,61 @@ func TestNewChannelSink(t *testing.T) { }) } } + +func TestProcess(t *testing.T) { + t.Parallel() + + timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Nanosecond) + tests := []struct { + name string + c chan *eventlogger.Event + ctx context.Context + d time.Duration + wantErrContains string + }{ + { + name: "valid", + c: make(chan *eventlogger.Event, 1), + d: time.Second, + ctx: context.Background(), + }, + { + name: "write timeout", + c: make(chan *eventlogger.Event, 0), + d: time.Second, + ctx: context.Background(), + wantErrContains: "chan write timeout", + }, + { + name: "context timeout", + c: make(chan *eventlogger.Event, 0), + d: time.Second, + ctx: timeoutCtx, + wantErrContains: "context deadline exceeded", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + sink, err := NewChannelSink(tt.c, tt.d) + require.Nil(err) + assert.NotNil(sink) + event := &eventlogger.Event{ + Type: "testEvent", + CreatedAt: time.Time{}, + Formatted: nil, + Payload: nil, + } + got, err := sink.Process(tt.ctx, event) + require.Nil(got) + if tt.wantErrContains != "" { + require.Error(err) + assert.Contains(err.Error(), tt.wantErrContains) + return + } + require.Nil(err) + + }) + } +} From 683c2bb1f9368d716bfd1cf3ab8bc246b5d7b4aa Mon Sep 17 00:00:00 2001 From: irenarindos Date: Mon, 11 Sep 2023 15:32:39 -0400 Subject: [PATCH 6/6] fixup! feat(channel sink): add channel sink type --- sinks/channel/channel_sink_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sinks/channel/channel_sink_test.go b/sinks/channel/channel_sink_test.go index e01fd3a..434d8ec 100644 --- a/sinks/channel/channel_sink_test.go +++ b/sinks/channel/channel_sink_test.go @@ -57,6 +57,7 @@ func TestProcess(t *testing.T) { t.Parallel() timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Nanosecond) + tests := []struct { name string c chan *eventlogger.Event @@ -106,7 +107,6 @@ func TestProcess(t *testing.T) { return } require.Nil(err) - }) } }