Skip to content

Commit

Permalink
WIP 201902081650
Browse files Browse the repository at this point in the history
  • Loading branch information
awillis committed Feb 9, 2019
1 parent 9272a2f commit 02e3fe1
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 40 deletions.
4 changes: 0 additions & 4 deletions core/logger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package core

import (
"fmt"
"os"
"runtime"
"strings"
Expand Down Expand Up @@ -29,8 +28,6 @@ func LogConfig(name string, id string) *zap.Config {

if name == "core" {
fields["pid"] = id
} else {
basename = name + "-" + id
}

logfile := new(strings.Builder)
Expand All @@ -42,7 +39,6 @@ func LogConfig(name string, id string) *zap.Config {
logfile.WriteRune(os.PathSeparator)
logfile.WriteString(basename)
logfile.WriteString(".log")
fmt.Println(logfile.String())

//fmt.Println(filepath.FromSlash(filepath.Clean(logfile.String())))

Expand Down
2 changes: 1 addition & 1 deletion message/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Batch struct {
CancelIter context.CancelFunc
}

func NewBatch(size uint) *Batch {
func NewBatch(size uint64) *Batch {
return &Batch{
msgs: make([]*Message, 0, size),
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func assembleConfig(config Config) (pipe *Pipe) {

attachProcessorToPipe(pipe, config.RejectSink, plugin.SINK)
attachProcessorToPipe(pipe, config.AcceptSink, plugin.SINK)
pipe.Configure()
pipe.ConfigureAndInitialize(config.Pipe)
return
}

Expand Down
22 changes: 22 additions & 0 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ type (

Config struct {
Name string
Pipe PipeConfig
Source ProcessorConfig
AcceptSink ProcessorConfig
RejectSink ProcessorConfig
Conduit []ProcessorConfig
}

PipeConfig struct {
PollInterval uint64 `toml:"poll_interval"`
BatchSize uint64 `toml:"batch_size"`
TableLoadingMode string `toml:"table_loading_mode"`
ValueLogLoadingMode string `toml:"value_log_loading_mode"`
}

ProcessorConfig struct {
Plugin string `toml:"plugin"`
Option OptionMap `toml:"option"`
Expand Down Expand Up @@ -63,6 +71,20 @@ func ReadConfigurationFile(filename string) (config Config, err error) {
return config, errors.Wrap(ErrConfigValue, "pipeline name")
}

// pipe
pipe := tree.Get("pipe")
switch pipe.(type) {
case *toml.Tree:
if pipeConf, ok := pipe.(*toml.Tree); ok {
if e := pipeConf.Unmarshal(&config.Pipe); e != nil {
return config, e
}
}
default:
pos := tree.Position()
return config, errors.Wrapf(ErrConfigSection, "pipe at line %d, column %d", pos.Line, pos.Col)
}

// source
source := tree.Get("source")
switch source.(type) {
Expand Down
59 changes: 52 additions & 7 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package pipeline

import (
"errors"
"fmt"
"github.com/google/uuid"
"go.uber.org/zap"
"os"
"strings"
"time"

"github.com/awillis/sluus/core"
"github.com/awillis/sluus/plugin"
Expand All @@ -17,10 +21,12 @@ var (

type (
Component struct {
id uint
next *Component
pipe *Pipe
Value processor.Interface
}

Pipe struct {
Id string
Name string
Expand Down Expand Up @@ -107,31 +113,61 @@ func (p *Pipe) Attach(component *Component) {
for n := &p.root; n != component; n = n.Next() {
if n.Next() == nil {
component.Value.SetLogger(p.logger)
if e := component.Value.Initialize(); e != nil {
p.Logger().Error(e)
}
p.len++
component.id = p.len
n.pipe = p
n.next = component
}
}
}

func (p *Pipe) Configure() {
func (p *Pipe) ConfigureAndInitialize(pipeConf PipeConfig) {

reject := processor.Reject(p.Reject().Value.Sluus().Input())
accept := processor.Accept(p.Accept().Value.Sluus().Input())
pollIntvl := processor.PollInterval(time.Duration(pipeConf.PollInterval) * time.Second)
batchSize := processor.BatchSize(pipeConf.BatchSize)
tableMode := processor.TableLoadingMode(pipeConf.TableLoadingMode)
valueMode := processor.ValueLogLoadingMode(pipeConf.ValueLogLoadingMode)

for n := p.Source(); n != nil; n = n.Next() {

if err := n.Value.Sluus().Configure(
reject,
accept,
pollIntvl,
batchSize,
); err != nil {
p.Logger().Error(err)
}

dir := dataDirBuilder(p.Name)

dir.WriteString(fmt.Sprintf("%d-%s-%s",
n.id,
plugin.TypeName(n.Value.Plugin().Type()),
n.Value.Plugin().Name()))

for n := &p.root; n.Next() == nil; n = n.Next() {
if err := n.Value.Sluus().Configure(reject, accept); err != nil {
dataDir := processor.DataDir(dir.String())

if err := n.Value.Sluus().Queue().Configure(
dataDir,
tableMode,
valueMode,
); err != nil {
p.Logger().Error(err)
}

if n.Next() != p.Accept() {
if n != p.Accept() {
input := processor.Input(n.Value.Sluus().Output())
if err := n.Next().Value.Sluus().Configure(input); err != nil {
p.Logger().Error(err)
}
}

if e := n.Value.Initialize(); e != nil {
p.Logger().Error(e)
}
}
}

Expand Down Expand Up @@ -171,3 +207,12 @@ func (p *Pipe) Add(proc processor.Interface) (err error) {
p.Attach(component)
return
}

func dataDirBuilder(pipeName string) (dirpath *strings.Builder) {
dirpath = new(strings.Builder)
dirpath.WriteString(core.DATADIR)
dirpath.WriteRune(os.PathSeparator)
dirpath.WriteString(pipeName)
dirpath.WriteRune(os.PathSeparator)
return
}
2 changes: 1 addition & 1 deletion plugin/tcp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type options struct {
// TCP port number to listen on
port int
// batch size
batchSize int
batchSize uint64
// application buffer size
bufferSize int
// OS socket buffer size, a portion of which will be allocated for the app
Expand Down
4 changes: 2 additions & 2 deletions plugin/tcp/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (s *Source) handleConnection(conn *net.TCPConn) {

func (s *Source) Collector() {

batch := message.NewBatch(uint(s.opts.batchSize))
batch := message.NewBatch(s.opts.batchSize)

for {
select {
Expand All @@ -175,7 +175,7 @@ func (s *Source) Collector() {
s.Logger().Error(err)
}
} else {
b := message.NewBatch(uint(batch.Count()))
b := message.NewBatch(batch.Count())
for msg := range batch.Iter() {
_ = b.Add(msg)
}
Expand Down
28 changes: 23 additions & 5 deletions processor/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func Accept(accept *ring.RingBuffer) SluusOpt {

func PollInterval(duration time.Duration) SluusOpt {
return func(s *Sluus) (err error) {
if duration < time.Second {
duration = time.Second
if duration < 3*time.Second {
duration = 3 * time.Second
}
s.pollInterval = duration
return
}
}

func BatchSize(size uint) SluusOpt {
func BatchSize(size uint64) SluusOpt {
return func(s *Sluus) (err error) {
s.batchSize = size
return
Expand All @@ -62,14 +62,32 @@ func DataDir(path string) QueueOpt {

func TableLoadingMode(mode string) QueueOpt {
return func(q *Queue) (err error) {
q.opts.TableLoadingMode = options.FileIO
switch mode {
case "file":
q.opts.TableLoadingMode = options.FileIO
case "memory":
q.opts.TableLoadingMode = options.LoadToRAM
case "mmap":
q.opts.TableLoadingMode = options.MemoryMap
default:
q.opts.TableLoadingMode = options.LoadToRAM
}
return
}
}

func ValueLogLoadingMode(mode string) QueueOpt {
return func(q *Queue) (err error) {
q.opts.ValueLogLoadingMode = options.FileIO
switch mode {
case "file":
q.opts.ValueLogLoadingMode = options.FileIO
case "memory":
q.opts.ValueLogLoadingMode = options.LoadToRAM
case "mmap":
q.opts.ValueLogLoadingMode = options.MemoryMap
default:
q.opts.ValueLogLoadingMode = options.FileIO
}
return
}
}
5 changes: 4 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(name string, pluginType plugin.Type) (proc *Processor) {
}

func (p *Processor) Load() (err error) {
p.sluus = NewSluus(p)
p.sluus = NewSluus()

if plug, e := plugin.New(p.Name, p.pluginType); e != nil {
err = errors.Wrap(ErrPluginLoad, e.Error())
Expand Down Expand Up @@ -118,6 +118,9 @@ func (p *Processor) SetLogger(logger *zap.SugaredLogger) {
}

func (p *Processor) Start() (err error) {

p.sluus.Start()

for i := 0; i < runtime.NumCPU(); i++ {

// runner is used to avoid interface dynamic dispatch penalty
Expand Down
8 changes: 6 additions & 2 deletions processor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
"os"
"time"

"github.com/awillis/sluus/message"
Expand All @@ -13,7 +14,7 @@ import (
// design influenced by http://www.drdobbs.com/parallel/lock-free-queues/208801974

const (
INPUT byte = 3 << iota
INPUT byte = iota
OUTPUT
REJECT
ACCEPT
Expand Down Expand Up @@ -48,6 +49,9 @@ func (q *Queue) Configure(opts ...QueueOpt) (err error) {
}

func (q *Queue) Initialize() (err error) {
if e := os.MkdirAll(q.opts.Dir, 0755); e != nil {
return e
}
q.db, err = badger.Open(q.opts)
return
}
Expand Down Expand Up @@ -95,7 +99,7 @@ func (q *Queue) Put(prefix byte, batch *message.Batch) (err error) {
return
}

func (q *Queue) Get(prefix byte, batchSize uint) (batch *message.Batch, err error) {
func (q *Queue) Get(prefix byte, batchSize uint64) (batch *message.Batch, err error) {

if q.Size() == 0 {
return // no data, no error
Expand Down
Loading

0 comments on commit 02e3fe1

Please sign in to comment.