diff --git a/core/logger.go b/core/logger.go index 438f70c..44bb91d 100644 --- a/core/logger.go +++ b/core/logger.go @@ -1,7 +1,6 @@ package core import ( - "fmt" "os" "runtime" "strings" @@ -29,8 +28,6 @@ func LogConfig(name string, id string) *zap.Config { if name == "core" { fields["pid"] = id - } else { - basename = name + "-" + id } logfile := new(strings.Builder) @@ -42,7 +39,6 @@ func LogConfig(name string, id string) *zap.Config { logfile.WriteRune(os.PathSeparator) logfile.WriteString(basename) logfile.WriteString(".log") - fmt.Println(logfile.String()) //fmt.Println(filepath.FromSlash(filepath.Clean(logfile.String()))) diff --git a/message/batch.go b/message/batch.go index 9b3b914..a1d3991 100644 --- a/message/batch.go +++ b/message/batch.go @@ -12,7 +12,7 @@ type Batch struct { CancelIter context.CancelFunc } -func NewBatch(size uint) *Batch { +func NewBatch(size uint64) *Batch { return &Batch{ msgs: make([]*Message, 0, size), } diff --git a/pipeline/assemble.go b/pipeline/assemble.go index fca94e0..a9f8a1c 100644 --- a/pipeline/assemble.go +++ b/pipeline/assemble.go @@ -52,7 +52,7 @@ func assembleConfig(config Config) (pipe *Pipe) { attachProcessorToPipe(pipe, config.RejectSink, plugin.SINK) attachProcessorToPipe(pipe, config.AcceptSink, plugin.SINK) - pipe.Configure() + pipe.ConfigureAndInitialize(config.Pipe) return } diff --git a/pipeline/config.go b/pipeline/config.go index 75745ce..7c3c408 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -20,12 +20,20 @@ type ( Config struct { Name string + Pipe PipeConfig Source ProcessorConfig AcceptSink ProcessorConfig RejectSink ProcessorConfig Conduit []ProcessorConfig } + PipeConfig struct { + PollInterval uint64 `toml:"poll_interval"` + BatchSize uint64 `toml:"batch_size"` + TableLoadingMode string `toml:"table_loading_mode"` + ValueLogLoadingMode string `toml:"value_log_loading_mode"` + } + ProcessorConfig struct { Plugin string `toml:"plugin"` Option OptionMap `toml:"option"` @@ -63,6 +71,20 @@ func ReadConfigurationFile(filename string) (config Config, err error) { return config, errors.Wrap(ErrConfigValue, "pipeline name") } + // pipe + pipe := tree.Get("pipe") + switch pipe.(type) { + case *toml.Tree: + if pipeConf, ok := pipe.(*toml.Tree); ok { + if e := pipeConf.Unmarshal(&config.Pipe); e != nil { + return config, e + } + } + default: + pos := tree.Position() + return config, errors.Wrapf(ErrConfigSection, "pipe at line %d, column %d", pos.Line, pos.Col) + } + // source source := tree.Get("source") switch source.(type) { diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 7a1acea..0b7a04a 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -2,8 +2,12 @@ package pipeline import ( "errors" + "fmt" "github.com/google/uuid" "go.uber.org/zap" + "os" + "strings" + "time" "github.com/awillis/sluus/core" "github.com/awillis/sluus/plugin" @@ -17,10 +21,12 @@ var ( type ( Component struct { + id uint next *Component pipe *Pipe Value processor.Interface } + Pipe struct { Id string Name string @@ -107,31 +113,61 @@ func (p *Pipe) Attach(component *Component) { for n := &p.root; n != component; n = n.Next() { if n.Next() == nil { component.Value.SetLogger(p.logger) - if e := component.Value.Initialize(); e != nil { - p.Logger().Error(e) - } p.len++ + component.id = p.len n.pipe = p n.next = component } } } -func (p *Pipe) Configure() { +func (p *Pipe) ConfigureAndInitialize(pipeConf PipeConfig) { + reject := processor.Reject(p.Reject().Value.Sluus().Input()) accept := processor.Accept(p.Accept().Value.Sluus().Input()) + pollIntvl := processor.PollInterval(time.Duration(pipeConf.PollInterval) * time.Second) + batchSize := processor.BatchSize(pipeConf.BatchSize) + tableMode := processor.TableLoadingMode(pipeConf.TableLoadingMode) + valueMode := processor.ValueLogLoadingMode(pipeConf.ValueLogLoadingMode) + + for n := p.Source(); n != nil; n = n.Next() { + + if err := n.Value.Sluus().Configure( + reject, + accept, + pollIntvl, + batchSize, + ); err != nil { + p.Logger().Error(err) + } + + dir := dataDirBuilder(p.Name) + + dir.WriteString(fmt.Sprintf("%d-%s-%s", + n.id, + plugin.TypeName(n.Value.Plugin().Type()), + n.Value.Plugin().Name())) - for n := &p.root; n.Next() == nil; n = n.Next() { - if err := n.Value.Sluus().Configure(reject, accept); err != nil { + dataDir := processor.DataDir(dir.String()) + + if err := n.Value.Sluus().Queue().Configure( + dataDir, + tableMode, + valueMode, + ); err != nil { p.Logger().Error(err) } - if n.Next() != p.Accept() { + if n != p.Accept() { input := processor.Input(n.Value.Sluus().Output()) if err := n.Next().Value.Sluus().Configure(input); err != nil { p.Logger().Error(err) } } + + if e := n.Value.Initialize(); e != nil { + p.Logger().Error(e) + } } } @@ -171,3 +207,12 @@ func (p *Pipe) Add(proc processor.Interface) (err error) { p.Attach(component) return } + +func dataDirBuilder(pipeName string) (dirpath *strings.Builder) { + dirpath = new(strings.Builder) + dirpath.WriteString(core.DATADIR) + dirpath.WriteRune(os.PathSeparator) + dirpath.WriteString(pipeName) + dirpath.WriteRune(os.PathSeparator) + return +} diff --git a/plugin/tcp/common.go b/plugin/tcp/common.go index 88a7164..04490b3 100644 --- a/plugin/tcp/common.go +++ b/plugin/tcp/common.go @@ -19,7 +19,7 @@ type options struct { // TCP port number to listen on port int // batch size - batchSize int + batchSize uint64 // application buffer size bufferSize int // OS socket buffer size, a portion of which will be allocated for the app diff --git a/plugin/tcp/source.go b/plugin/tcp/source.go index d3958ae..f0bf934 100644 --- a/plugin/tcp/source.go +++ b/plugin/tcp/source.go @@ -162,7 +162,7 @@ func (s *Source) handleConnection(conn *net.TCPConn) { func (s *Source) Collector() { - batch := message.NewBatch(uint(s.opts.batchSize)) + batch := message.NewBatch(s.opts.batchSize) for { select { @@ -175,7 +175,7 @@ func (s *Source) Collector() { s.Logger().Error(err) } } else { - b := message.NewBatch(uint(batch.Count())) + b := message.NewBatch(batch.Count()) for msg := range batch.Iter() { _ = b.Add(msg) } diff --git a/processor/option.go b/processor/option.go index 9446c18..dcfed16 100644 --- a/processor/option.go +++ b/processor/option.go @@ -37,15 +37,15 @@ func Accept(accept *ring.RingBuffer) SluusOpt { func PollInterval(duration time.Duration) SluusOpt { return func(s *Sluus) (err error) { - if duration < time.Second { - duration = time.Second + if duration < 3*time.Second { + duration = 3 * time.Second } s.pollInterval = duration return } } -func BatchSize(size uint) SluusOpt { +func BatchSize(size uint64) SluusOpt { return func(s *Sluus) (err error) { s.batchSize = size return @@ -62,14 +62,32 @@ func DataDir(path string) QueueOpt { func TableLoadingMode(mode string) QueueOpt { return func(q *Queue) (err error) { - q.opts.TableLoadingMode = options.FileIO + switch mode { + case "file": + q.opts.TableLoadingMode = options.FileIO + case "memory": + q.opts.TableLoadingMode = options.LoadToRAM + case "mmap": + q.opts.TableLoadingMode = options.MemoryMap + default: + q.opts.TableLoadingMode = options.LoadToRAM + } return } } func ValueLogLoadingMode(mode string) QueueOpt { return func(q *Queue) (err error) { - q.opts.ValueLogLoadingMode = options.FileIO + switch mode { + case "file": + q.opts.ValueLogLoadingMode = options.FileIO + case "memory": + q.opts.ValueLogLoadingMode = options.LoadToRAM + case "mmap": + q.opts.ValueLogLoadingMode = options.MemoryMap + default: + q.opts.ValueLogLoadingMode = options.FileIO + } return } } diff --git a/processor/processor.go b/processor/processor.go index 496a891..c21fe26 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -69,7 +69,7 @@ func New(name string, pluginType plugin.Type) (proc *Processor) { } func (p *Processor) Load() (err error) { - p.sluus = NewSluus(p) + p.sluus = NewSluus() if plug, e := plugin.New(p.Name, p.pluginType); e != nil { err = errors.Wrap(ErrPluginLoad, e.Error()) @@ -118,6 +118,9 @@ func (p *Processor) SetLogger(logger *zap.SugaredLogger) { } func (p *Processor) Start() (err error) { + + p.sluus.Start() + for i := 0; i < runtime.NumCPU(); i++ { // runner is used to avoid interface dynamic dispatch penalty diff --git a/processor/queue.go b/processor/queue.go index fc2b16d..a11971a 100644 --- a/processor/queue.go +++ b/processor/queue.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "encoding/json" + "os" "time" "github.com/awillis/sluus/message" @@ -13,7 +14,7 @@ import ( // design influenced by http://www.drdobbs.com/parallel/lock-free-queues/208801974 const ( - INPUT byte = 3 << iota + INPUT byte = iota OUTPUT REJECT ACCEPT @@ -48,6 +49,9 @@ func (q *Queue) Configure(opts ...QueueOpt) (err error) { } func (q *Queue) Initialize() (err error) { + if e := os.MkdirAll(q.opts.Dir, 0755); e != nil { + return e + } q.db, err = badger.Open(q.opts) return } @@ -95,7 +99,7 @@ func (q *Queue) Put(prefix byte, batch *message.Batch) (err error) { return } -func (q *Queue) Get(prefix byte, batchSize uint) (batch *message.Batch, err error) { +func (q *Queue) Get(prefix byte, batchSize uint64) (batch *message.Batch, err error) { if q.Size() == 0 { return // no data, no error diff --git a/processor/sluus.go b/processor/sluus.go index f34167b..767ab6e 100644 --- a/processor/sluus.go +++ b/processor/sluus.go @@ -11,23 +11,29 @@ import ( type ( Sluus struct { - batchSize uint + batchSize uint64 inCtr, outCtr uint64 pollInterval time.Duration wg *sync.WaitGroup queue *Queue - ring map[byte]*ring.RingBuffer + ring sluusRing logger *zap.SugaredLogger } - SluusOpt func(*Sluus) error + sluusRing map[byte]*ring.RingBuffer + SluusOpt func(*Sluus) error ) -func NewSluus(proc Interface) (s *Sluus) { +func NewSluus() (sluus *Sluus) { return &Sluus{ wg: new(sync.WaitGroup), queue: NewQueue(), - ring: make(map[byte]*ring.RingBuffer), + ring: sluusRing{ + INPUT: new(ring.RingBuffer), + OUTPUT: new(ring.RingBuffer), + REJECT: new(ring.RingBuffer), + ACCEPT: new(ring.RingBuffer), + }, } } @@ -42,19 +48,16 @@ func (s *Sluus) Configure(opts ...SluusOpt) (err error) { } func (s *Sluus) Initialize() (err error) { + return s.queue.Initialize() +} - if e := s.queue.Initialize(); e != nil { - return e - } - +func (s *Sluus) Start() { for i := 0; i < runtime.NumCPU(); i++ { go s.inputIO() go s.outputIO(OUTPUT) go s.outputIO(REJECT) go s.outputIO(ACCEPT) } - - return } func (s *Sluus) Logger() *zap.SugaredLogger { @@ -65,26 +68,31 @@ func (s *Sluus) SetLogger(logger *zap.SugaredLogger) { s.logger = logger } -// Input() is used by the pipeline to connect processors together +// Input() is used during pipeling assembly func (s *Sluus) Input() *ring.RingBuffer { return s.ring[INPUT] } -// Output() is used by the pipeline to connect processors together +// Output() is used during pipeling assembly func (s *Sluus) Output() *ring.RingBuffer { return s.ring[OUTPUT] } -// Reject() is used by the pipeline to connect processors together +// Reject() is used during pipeling assembly func (s *Sluus) Reject() *ring.RingBuffer { return s.ring[REJECT] } -// Accept() is used by the pipeline to connect processors together +// Accept() is used during pipeling assembly func (s *Sluus) Accept() *ring.RingBuffer { return s.ring[ACCEPT] } +// Queue() is used during pipeling assembly +func (s *Sluus) Queue() *Queue { + return s.queue +} + func (s *Sluus) shutdown() { for i := range s.ring { @@ -160,7 +168,7 @@ func (s *Sluus) outputIO(prefix byte) { break } - batch, err := s.queue.Get(prefix, uint(r.Cap())) + batch, err := s.queue.Get(prefix, r.Cap()) if err != nil { s.logger.Error(err) }