Skip to content

Commit

Permalink
fix: fix a bug where AppendRequest with no entries triggers flush (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 26, 2024
1 parent cfa3479 commit 8a3ae22
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
s.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries, m.clock.Now())
// If the segment exceeded the maximum age or the maximum size, move s to
// the closed list to be flushed.
if m.clock.Since(s.w.firstAppend) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize {
if s.w.Age(m.clock.Now()) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize {
m.move(el, s)
}
return s.r, nil
Expand Down Expand Up @@ -224,7 +224,7 @@ func (m *Manager) move(el *list.Element, s *segment) {
func (m *Manager) moveFrontIfExpired() bool {
if el := m.available.Front(); el != nil {
s := el.Value.(*segment)
if !s.w.firstAppend.IsZero() && m.clock.Since(s.w.firstAppend) >= m.cfg.MaxAge {
if s.w.Age(m.clock.Now()) >= m.cfg.MaxAge {
m.move(el, s)
return true
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,38 @@ func TestManager_Append(t *testing.T) {
require.NoError(t, res.Err())
}

func TestManager_AppendNoEntries(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append no entries.
lbs := labels.Labels{{Name: "a", Value: "b"}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: []*logproto.Entry{},
})
require.NoError(t, err)
require.NotNil(t, res)

// The data hasn't been flushed, so reading from Done() should block.
select {
case <-res.Done():
t.Fatal("unexpected closed Done()")
default:
}

// The segment that was just appended to has neither reached the maximum
// age nor maximum size to be flushed.
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())
}

func TestManager_AppendFailed(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 30 * time.Second,
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func NewWalSegmentWriter() (*SegmentWriter, error) {

// Age returns the age of the segment.
func (b *SegmentWriter) Age(now time.Time) time.Duration {
if b.firstAppend.IsZero() {
return 0
}
return now.Sub(b.firstAppend)
}

Expand Down

0 comments on commit 8a3ae22

Please sign in to comment.