From f352d1b52cd30f9feb2604c30d13a60aad4766ce Mon Sep 17 00:00:00 2001 From: Vadim Alekseev Date: Fri, 15 Sep 2023 11:16:36 +0300 Subject: [PATCH] Fix stuck --- e2e/split_join/split_join.go | 16 +++++----------- pipeline/batch.go | 7 ++----- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/e2e/split_join/split_join.go b/e2e/split_join/split_join.go index 637ac9f2d..4a4b254d2 100644 --- a/e2e/split_join/split_join.go +++ b/e2e/split_join/split_join.go @@ -7,7 +7,6 @@ import ( "path" "path/filepath" "strings" - "sync" "testing" "time" @@ -87,28 +86,23 @@ func (c *Config) Validate(t *testing.T) { strBuilder := strings.Builder{} gotEvents := 0 - wg := sync.WaitGroup{} - wg.Add(expectedEventsCount) + done := make(chan struct{}) go func() { r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) { strBuilder.Write(msg.Value) strBuilder.WriteString("\n") gotEvents++ - wg.Done() + if gotEvents == expectedEventsCount { + close(done) + } }))) }() - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { case <-done: case <-ctx.Done(): - r.Failf("test timed out", "got: %v, expected: %v", gotEvents, expectedEventsCount) + r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String()) } got := strBuilder.String() diff --git a/pipeline/batch.go b/pipeline/batch.go index f425a22b7..1a226fec5 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -77,7 +77,7 @@ func (b *Batch) reset() { } func (b *Batch) append(e *Event) { - if e.IsChildParentKind() { + if !e.IsChildKind() { b.hasIterableEvents = true } @@ -91,10 +91,7 @@ func (b *Batch) updateStatus() BatchStatus { // batch is empty return BatchStatusNotReady } - if !b.hasIterableEvents { - // batch contains only parents - return BatchStatusNotReady - } + switch { case (b.maxSizeCount != 0 && l >= b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize): b.status = BatchStatusMaxSizeExceeded