Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Unexport and organize fields on manager struct #12793

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,8 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Input, error)
}

return &Input{
SugaredLogger: logger.With("component", "fileconsumer"),
finder: c.Finder,
PollInterval: c.PollInterval.Raw(),
queuedMatches: make([]string, 0),
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
roller: newRoller(),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: readerFactory{
SugaredLogger: logger.With("component", "fileconsumer"),
readerConfig: &readerConfig{
Expand All @@ -137,5 +129,13 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Input, error)
fromBeginning: startAtBeginning,
splitterConfig: c.Splitter,
},
finder: c.Finder,
roller: newRoller(),
pollInterval: c.PollInterval.Raw(),
maxConcurrentFiles: c.MaxConcurrentFiles,
knownFiles: make([]*Reader, 0, 10),
seenPaths: make(map[string]struct{}, 100),
firstCheck: true,
queuedMatches: make([]string, 0),
}, nil
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestBuild(t *testing.T) {
require.NoError,
func(t *testing.T, f *Input) {
require.Equal(t, f.finder.Include, []string{"/var/log/testpath.*"})
require.Equal(t, f.PollInterval, 10*time.Millisecond)
require.Equal(t, f.pollInterval, 10*time.Millisecond)
},
},
{
Expand Down
32 changes: 15 additions & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,22 @@ type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte)
// TODO rename this struct
type Input struct {
*zap.SugaredLogger
finder Finder
PollInterval time.Duration
wg sync.WaitGroup
cancel context.CancelFunc

MaxConcurrentFiles int
SeenPaths map[string]struct{}
readerFactory readerFactory
finder Finder
roller roller
persister operator.Persister

persister operator.Persister
pollInterval time.Duration
maxConcurrentFiles int
maxBatchFiles int

knownFiles []*Reader
seenPaths map[string]struct{}
firstCheck bool
queuedMatches []string
maxBatchFiles int
roller roller

firstCheck bool
wg sync.WaitGroup
cancel context.CancelFunc

readerFactory readerFactory
}

func (f *Input) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -90,7 +88,7 @@ func (f *Input) startPoller(ctx context.Context) {
f.wg.Add(1)
go func() {
defer f.wg.Done()
globTicker := time.NewTicker(f.PollInterval)
globTicker := time.NewTicker(f.pollInterval)
defer globTicker.Stop()

for {
Expand All @@ -107,7 +105,7 @@ func (f *Input) startPoller(ctx context.Context) {

// poll checks all the watched paths for new entries
func (f *Input) poll(ctx context.Context) {
f.maxBatchFiles = f.MaxConcurrentFiles / 2
f.maxBatchFiles = f.maxConcurrentFiles / 2
var matches []string
if len(f.queuedMatches) > f.maxBatchFiles {
matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:]
Expand Down Expand Up @@ -166,13 +164,13 @@ func (f *Input) makeReaders(filesPaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filesPaths))
for _, path := range filesPaths {
if _, ok := f.SeenPaths[path]; !ok {
if _, ok := f.seenPaths[path]; !ok {
if f.readerFactory.fromBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
f.seenPaths[path] = struct{}{}
}
file, err := os.Open(path) // #nosec - operator must read in files defined by user
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/stanza/operator/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,11 @@ func TestReadExistingAndNewLogs(t *testing.T) {
// we don't read any entries that were in the file before startup
func TestStartAtEnd(t *testing.T) {
t.Parallel()

var pollInterval time.Duration
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) {
cfg.StartAt = "end"
pollInterval = cfg.PollInterval.Raw()
}, nil)

temp := openTemp(t, tempDir)
Expand All @@ -419,7 +422,7 @@ func TestStartAtEnd(t *testing.T) {
require.NoError(t, operator.Stop())
}()

time.Sleep(2 * operator.fileConsumer.PollInterval)
time.Sleep(2 * pollInterval)

expectNoMessages(t, logReceived)

Expand All @@ -433,16 +436,19 @@ func TestStartAtEnd(t *testing.T) {
// beginning
func TestStartAtEndNewFile(t *testing.T) {
t.Parallel()

var pollInterval time.Duration
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) {
cfg.StartAt = "end"
pollInterval = cfg.PollInterval.Raw()
}, nil)

require.NoError(t, operator.Start(testutil.NewMockPersister("test")))
defer func() {
require.NoError(t, operator.Stop())
}()

time.Sleep(2 * operator.fileConsumer.PollInterval)
time.Sleep(2 * pollInterval)

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\ntestlog2\n")
Expand Down
16 changes: 16 additions & 0 deletions unreleased/pkg-stanza-fileconsumer-cleanup-names.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/fileconsumer

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Unexport several fields that are meant for internal usage only

# One or more tracking issues related to the change
issues: [12793]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: