Skip to content

Commit

Permalink
Fix stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Sep 26, 2023
1 parent daf5b15 commit f352d1b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
16 changes: 5 additions & 11 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -87,28 +86,23 @@ func (c *Config) Validate(t *testing.T) {

strBuilder := strings.Builder{}
gotEvents := 0
wg := sync.WaitGroup{}
wg.Add(expectedEventsCount)
done := make(chan struct{})

go func() {
r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
strBuilder.Write(msg.Value)
strBuilder.WriteString("\n")
gotEvents++
wg.Done()
if gotEvents == expectedEventsCount {
close(done)
}
})))
}()

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
case <-ctx.Done():
r.Failf("test timed out", "got: %v, expected: %v", gotEvents, expectedEventsCount)
r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String())
}

got := strBuilder.String()
Expand Down
7 changes: 2 additions & 5 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (b *Batch) reset() {
}

func (b *Batch) append(e *Event) {
if e.IsChildParentKind() {
if !e.IsChildKind() {
b.hasIterableEvents = true
}

Expand All @@ -91,10 +91,7 @@ func (b *Batch) updateStatus() BatchStatus {
// batch is empty
return BatchStatusNotReady
}
if !b.hasIterableEvents {
// batch contains only parents
return BatchStatusNotReady
}

switch {
case (b.maxSizeCount != 0 && l >= b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
b.status = BatchStatusMaxSizeExceeded
Expand Down

0 comments on commit f352d1b

Please sign in to comment.