Skip to content

Commit

Permalink
Merge pull request #43 from blinklabs-io/feat/output-embedded
Browse files Browse the repository at this point in the history
feat: embedded output plugin
  • Loading branch information
agaffney authored Jul 7, 2023
2 parents b7af053 + 3184a86 commit 1626711
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 0 deletions.
1 change: 1 addition & 0 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package filter

// We import the various plugins that we want to be auto-registered
import (
_ "github.com/blinklabs-io/snek/filter/chainsync"
_ "github.com/blinklabs-io/snek/filter/event"
Expand Down
1 change: 1 addition & 0 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package input

// We import the various plugins that we want to be auto-registered
import (
_ "github.com/blinklabs-io/snek/input/chainsync"
)
89 changes: 89 additions & 0 deletions output/embedded/embedded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2023 Blink Labs, LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package embedded

import (
"fmt"

"github.com/blinklabs-io/snek/event"
)

type CallbackFunc func(event.Event) error

type EmbeddedOutput struct {
errorChan chan error
eventChan chan event.Event
callbackFunc CallbackFunc
outputChan chan event.Event
}

func New(options ...EmbeddedOptionFunc) *EmbeddedOutput {
e := &EmbeddedOutput{
errorChan: make(chan error),
eventChan: make(chan event.Event, 10),
}
for _, option := range options {
option(e)
}
return e
}

// Start the embedded output
func (e *EmbeddedOutput) Start() error {
go func() {
for {
evt, ok := <-e.eventChan
// Channel has been closed, which means we're shutting down
if !ok {
return
}
if e.callbackFunc != nil {
if err := e.callbackFunc(evt); err != nil {
e.errorChan <- fmt.Errorf("callback function error: %s", err)
return
}
}
if e.outputChan != nil {
e.outputChan <- evt
}
}
}()
return nil
}

// Stop the embedded output
func (e *EmbeddedOutput) Stop() error {
close(e.eventChan)
close(e.errorChan)
if e.outputChan != nil {
close(e.outputChan)
}
return nil
}

// ErrorChan returns the input error channel
func (e *EmbeddedOutput) ErrorChan() chan error {
return e.errorChan
}

// InputChan returns the input event channel
func (e *EmbeddedOutput) InputChan() chan<- event.Event {
return e.eventChan
}

// OutputChan always returns nil
func (e *EmbeddedOutput) OutputChan() <-chan event.Event {
return nil
}
33 changes: 33 additions & 0 deletions output/embedded/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Blink Labs, LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package embedded

import "github.com/blinklabs-io/snek/event"

type EmbeddedOptionFunc func(*EmbeddedOutput)

// WithCallbackFunc specifies a callback function for events
func WithCallbackFunc(callbackFunc CallbackFunc) EmbeddedOptionFunc {
return func(o *EmbeddedOutput) {
o.callbackFunc = callbackFunc
}
}

// WithOutputChan specifies an event.Event channel to use for events
func WithOutputChan(outputChan chan event.Event) EmbeddedOptionFunc {
return func(o *EmbeddedOutput) {
o.outputChan = outputChan
}
}
1 change: 1 addition & 0 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package output

// We import the various plugins that we want to be auto-registered
import (
_ "github.com/blinklabs-io/snek/output/log"
)

0 comments on commit 1626711

Please sign in to comment.