Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split plugin #299

Merged
merged 29 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bcb87ff
Implement remap plugin
vadimalekseev Feb 9, 2023
17924b8
Review fixes
vadimalekseev Feb 10, 2023
9019033
Spawn array of nodes
vadimalekseev Feb 10, 2023
ab4592a
Implement iterator in the batcher
vadimalekseev Feb 15, 2023
2f675a5
The plugin now works with the join plugin
vadimalekseev Feb 15, 2023
0837d53
Set kindStr fo the childParent kind
vadimalekseev Feb 15, 2023
263371a
Refactor
vadimalekseev Feb 17, 2023
a77bd3a
Fix wrong memory re-usage
vadimalekseev Feb 17, 2023
a4486b5
Some fixes
vadimalekseev Feb 22, 2023
ee968e1
Eliminate the hanging of the batcher
vadimalekseev Feb 25, 2023
78a6be5
Fix tests
vadimalekseev Mar 2, 2023
4d2932b
Merge branch 'master' into 293-split-plugin
vadimalekseev Mar 2, 2023
b8d09b4
Count parents in the batch
vadimalekseev Mar 3, 2023
fdaebbc
Merge branch 'master' into 293-split-plugin
vadimalekseev Mar 3, 2023
a2daafb
Rename remap -> split
vadimalekseev Mar 3, 2023
a12453c
Merge branch 'master' into 293-split-plugin
vadimalekseev Mar 13, 2023
7c7cb59
Speedup test
vadimalekseev Mar 13, 2023
15178c0
Merge remote-tracking branch 'origin/master' into 293-split-plugin
vadimalekseev Aug 28, 2023
3d9f55e
Fix after merge build errors
vadimalekseev Aug 28, 2023
d0905be
Update tests
vadimalekseev Sep 14, 2023
daf5b15
Update tests
vadimalekseev Sep 14, 2023
f352d1b
Fix stuck
vadimalekseev Sep 15, 2023
5a5d7af
Review fixes
vadimalekseev Sep 26, 2023
0f2aae8
Merge branch 'master' into 293-split-plugin
vadimalekseev Oct 23, 2023
04cea9b
Merge remote-tracking branch 'origin/master' into 293-split-plugin
vadimalekseev Nov 13, 2023
ca6b5b5
Replace batcher iterator with ForEach func
vadimalekseev Nov 16, 2023
cbd0ba5
Merge branch 'master' into 293-split-plugin
vadimalekseev Nov 16, 2023
16c752b
Remove iteratorIndex
vadimalekseev Nov 16, 2023
f7c2948
Comment about decoder
vadimalekseev Nov 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
cache: true

- name: Run Kafka
run: docker compose -f ./e2e/kafka_file/docker-compose-kafka.yml up -d
run: docker compose -f ./e2e/kafka_file/docker-compose.yml up -d

- name: Run Clickhouse
run: docker compose -f ./e2e/file_clickhouse/docker-compose.yml up -d
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

Expand Down
1 change: 1 addition & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- [remove_fields](plugin/action/remove_fields/README.md)
- [rename](plugin/action/rename/README.md)
- [set_time](plugin/action/set_time/README.md)
- [split](plugin/action/split/README.md)
- [throttle](plugin/action/throttle/README.md)

- Output
Expand Down
1 change: 1 addition & 0 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
_ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
Expand Down
22 changes: 22 additions & 0 deletions e2e/split_join/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pipelines:
split_join:
settings:
event_timeout: 1h
capacity: 128
input:
type: file
offsets_op: reset
maintenance_interval: 1m
actions:
- type: debug
message: input event sample
- type: split
field: data
- type: join
field: message
start: '/^start/'
continue: '/^continue/'
- type: debug
message: output event sample
output:
type: kafka
19 changes: 19 additions & 0 deletions e2e/split_join/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package split_join

import (
"github.com/Shopify/sarama"
)

type handlerFunc func(message *sarama.ConsumerMessage)

func (h handlerFunc) Setup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handlerFunc) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handlerFunc) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
h(msg)
session.MarkMessage(msg, "")
}
return nil
}
120 changes: 120 additions & 0 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package split_join

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

const (
brokerHost = "localhost:9092"
group = "file_d_test_split_join_client"

arrayLen = 4
sample = `{ "data": [ { "first": "1" }, { "message": "start " }, { "message": "continue" }, { "second": "2" }, { "third": "3" } ] }`

messages = 10
)

type Config struct {
inputDir string
consumer sarama.ConsumerGroup
topic string
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
r := require.New(t)

c.inputDir = t.TempDir()
offsetsDir := t.TempDir()
c.topic = fmt.Sprintf("file_d_test_split_join_%d", time.Now().UnixNano())
t.Logf("generated topic: %s", c.topic)

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("brokers", []string{brokerHost})
output.Set("default_topic", c.topic)

addrs := []string{brokerHost}
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest

admin, err := sarama.NewClusterAdmin(addrs, config)
r.NoError(err)
r.NoError(admin.CreateTopic(c.topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false))

c.consumer, err = sarama.NewConsumerGroup(addrs, group, config)
r.NoError(err)
}

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer func(file *os.File) {
_ = file.Close()
}(file)

for i := 0; i < messages; i++ {
_, err = file.WriteString(sample + "\n")
require.NoError(t, err)
}
}

func (c *Config) Validate(t *testing.T) {
r := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

expectedEventsCount := messages * arrayLen

strBuilder := strings.Builder{}
gotEvents := 0
done := make(chan struct{})

go func() {
r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
strBuilder.Write(msg.Value)
strBuilder.WriteString("\n")
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})))
}()

select {
case <-done:
case <-ctx.Done():
r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String())
}

got := strBuilder.String()

expected := strings.Repeat(`{"first":"1"}
{"message":"start continue"}
{"second":"2"}
{"third":"3"}
`,
messages)

r.Equal(len(expected), len(got))
r.Equal(expected, got)
r.Equal(expectedEventsCount, gotEvents)
}
7 changes: 7 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_host"
_ "github.com/ozontech/file.d/plugin/action/convert_date"
Expand All @@ -34,6 +35,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
_ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
Expand Down Expand Up @@ -109,6 +111,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./join_throttle/config.yml",
},
{
name: "split_join",
e2eTest: &split_join.Config{},
cfgPath: "./split_join/config.yml",
},
{
name: "file_clickhouse",
e2eTest: &file_clickhouse.Config{},
Expand Down
55 changes: 43 additions & 12 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ const (
)

type Batch struct {
Events []*Event
events []*Event
// hasIterableEvents is the truth if the Batch contains iterable events
hasIterableEvents bool

// eventsSize contains total size of the Events in bytes
eventsSize int
Expand All @@ -34,6 +36,13 @@ type Batch struct {
status BatchStatus
}

func NewPreparedBatch(events []*Event) *Batch {
b := &Batch{}
b.reset()
b.events = events
return b
}

func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
if maxSizeCount < 0 {
logger.Fatalf("why batch max count less than 0?")
Expand All @@ -45,30 +54,41 @@ func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
logger.Fatalf("batch limits are not set")
}

return &Batch{
b := &Batch{
maxSizeCount: maxSizeCount,
maxSizeBytes: maxSizeBytes,
timeout: timeout,
Events: make([]*Event, 0, maxSizeCount),
events: make([]*Event, 0, maxSizeCount),
}
b.reset()

return b
}

func (b *Batch) reset() {
b.Events = b.Events[:0]
b.events = b.events[:0]
b.eventsSize = 0
b.status = BatchStatusNotReady
b.hasIterableEvents = false
b.startTime = time.Now()
}

func (b *Batch) append(e *Event) {
b.Events = append(b.Events, e)
b.hasIterableEvents = b.hasIterableEvents || !e.IsChildParentKind()

b.events = append(b.events, e)
b.eventsSize += e.Size
}

func (b *Batch) updateStatus() BatchStatus {
l := len(b.Events)
l := len(b.events)
if len(b.events) == 0 {
// batch is empty
return BatchStatusNotReady
}

switch {
case (b.maxSizeCount != 0 && l == b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
case (b.maxSizeCount != 0 && l >= b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
b.status = BatchStatusMaxSizeExceeded
case l > 0 && time.Since(b.startTime) > b.timeout:
b.status = BatchStatusTimeoutExceeded
Expand All @@ -78,6 +98,15 @@ func (b *Batch) updateStatus() BatchStatus {
return b.status
}

func (b *Batch) ForEach(cb func(event *Event)) {
for _, event := range b.events {
if event.IsChildParentKind() {
continue
}
cb(event)
}
}

type Batcher struct {
opts BatcherOptions

Expand Down Expand Up @@ -171,9 +200,11 @@ func (b *Batcher) work() {
for batch := range b.fullBatches {
b.workersInProgress.Inc()

now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
if batch.hasIterableEvents {
now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
}

status := b.commitBatch(batch)

Expand Down Expand Up @@ -213,8 +244,8 @@ func (b *Batcher) commitBatch(batch *Batch) BatchStatus {
b.commitSeq++
b.commitWaitingSeconds.Observe(time.Since(now).Seconds())

for i := range batch.Events {
b.opts.Controller.Commit(batch.Events[i])
for i := range batch.events {
b.opts.Controller.Commit(batch.events[i])
}

status := batch.status
Expand Down
Loading
Loading