Skip to content

Commit

Permalink
fix(agent): Use a unique WAL file for plugin instances of the same ty…
Browse files Browse the repository at this point in the history
…pe (#15966)

(cherry picked from commit e257c14)
  • Loading branch information
DStrand1 authored and srebhan committed Oct 28, 2024
1 parent 562ced1 commit 27d2a79
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ The agent table configures Telegraf and the defaults used across all plugins.

- **buffer_directory**:
The directory to use when in `disk` buffer mode. Each output plugin will make
another subdirectory in this directory with the output plugin's name.
another subdirectory in this directory with the output plugin's ID.

## Plugins

Expand Down
4 changes: 2 additions & 2 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type BufferStats struct {
}

// NewBuffer returns a new empty Buffer with the given capacity.
func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) {
func NewBuffer(name, id, alias string, capacity int, strategy, path string) (Buffer, error) {
registerGob()

bs := NewBufferStats(name, alias, capacity)
Expand All @@ -58,7 +58,7 @@ func NewBuffer(name string, alias string, capacity int, strategy string, path st
case "", "memory":
return NewMemoryBuffer(capacity, bs)
case "disk":
return NewDiskBuffer(name, path, bs)
return NewDiskBuffer(name, id, path, bs)
}
return nil, fmt.Errorf("invalid buffer strategy %q", strategy)
}
Expand Down
11 changes: 9 additions & 2 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@ type DiskBuffer struct {
isEmpty bool
}

func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) {
filePath := filepath.Join(path, name)
func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) {
filePath := filepath.Join(path, id)
walFile, err := wal.Open(filePath, nil)
if err != nil {
return nil, fmt.Errorf("failed to open wal file: %w", err)
}
//nolint:errcheck // cannot error here
if index, _ := walFile.FirstIndex(); index == 0 {
// simple way to test if the walfile is freshly initialized, meaning no existing file was found
log.Printf("I! WAL file not found for plugin outputs.%s (%s), "+
"this can safely be ignored if you added this plugin instance for the first time", name, id)
}

buf := &DiskBuffer{
BufferStats: stats,
file: walFile,
Expand Down
3 changes: 2 additions & 1 deletion models/buffer_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newTestDiskBuffer(t testing.TB) Buffer {

func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer {
t.Helper()
buf, err := NewBuffer(name, "", 0, "disk", path)
buf, err := NewBuffer(name, "123", "", 0, "disk", path)
require.NoError(t, err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
Expand All @@ -45,6 +45,7 @@ func TestBuffer_RetainsTrackingInformation(t *testing.T) {
func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) {
path, err := os.MkdirTemp("", "*-buffer-test")
require.NoError(t, err)
path = filepath.Join(path, "123")
walfile, err := wal.Open(path, nil)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion models/buffer_mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func newTestMemoryBuffer(t testing.TB, capacity int) Buffer {
t.Helper()
buf, err := NewBuffer("test", "", capacity, "memory", "")
buf, err := NewBuffer("test", "123", "", capacity, "memory", "")
require.NoError(t, err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
Expand Down
2 changes: 1 addition & 1 deletion models/buffer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func MetricTime(sec int64) telegraf.Metric {

func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
s.T().Helper()
buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath)
buf, err := NewBuffer("test", "123", "", capacity, s.bufferType, s.bufferPath)
s.Require().NoError(err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewRunningOutput(
batchSize = DefaultMetricBatchSize
}

b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
b, err := NewBuffer(config.Name, config.ID, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 27d2a79

Please sign in to comment.