Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] support gzip compressed log files for file log receiver #33406

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8a24d38
poc for compressed file support
bacherfl Jun 5, 2024
97c7357
poc for handling compressed log files in filelogreceiver
bacherfl Jun 5, 2024
042b7a2
remove obsolete change
bacherfl Jun 6, 2024
c0d8789
adapt unit tests and fix behavior for start_at parameter for compress…
bacherfl Jun 6, 2024
cba49af
revert obsolete changes, use SectionReader for gzip compressed files
bacherfl Jun 10, 2024
93d3b11
clean up obsolete changes
bacherfl Jun 10, 2024
138413e
clean up obsolete changes
bacherfl Jun 10, 2024
bdcdb92
revert changes in fingerprint implementation
bacherfl Jun 10, 2024
385d476
revert changes in factory
bacherfl Jun 10, 2024
4db39fb
revert changes in reader
bacherfl Jun 10, 2024
0249681
revert changes in factory
bacherfl Jun 10, 2024
34a2182
use reader.Read method to ensure fingerprint is updated
bacherfl Jun 10, 2024
35ac820
add simple test for reading gzip compressed file
bacherfl Jun 10, 2024
ce10537
add unit test for detecting new logs in compressed files
bacherfl Jun 11, 2024
39b14cc
Merge remote-tracking branch 'origin/main' into feat/2328/filelog-rec…
bacherfl Jun 11, 2024
40f6563
add documentation for compressed log file support
bacherfl Jun 11, 2024
b707ec0
use file handle passed as parameter
bacherfl Jun 11, 2024
a3eb04c
add changelog entry
bacherfl Jun 11, 2024
ddd5a94
Leave internal reader nil if gzip EOF
djaglowski Jun 11, 2024
e566b7f
Merge pull request #1 from djaglowski/gzip-refactor
bacherfl Jun 12, 2024
98c72a4
adapt readme to document compression option
bacherfl Jun 12, 2024
eb3f607
add offset override to avoid invalid gzip header errors after first read
bacherfl Jun 12, 2024
c84e64a
add unit test to verify correct setting of reader offset
bacherfl Jun 12, 2024
0b403a8
Merge branch 'main' into feat/2328/filelog-receiver-compressed-files
bacherfl Jun 13, 2024
bb6708b
Move gzip internal reader creation into ReadToEnd
djaglowski Jun 13, 2024
1bd6f0b
Merge pull request #2 from djaglowski/gzip-reader-readtoend
bacherfl Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/filelog_receiver_compressed_files.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for gzip compressed log files

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [2328]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
}

type HeaderConfig struct {
Expand Down Expand Up @@ -166,6 +167,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
Attributes: c.Resolver,
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
Compression: c.Compression,
}

var t tracker.Tracker
Expand Down
6 changes: 6 additions & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,12 @@ func (c *Config) withHeader(headerMatchPattern, extractRegex string) *Config {
return c
}

// withGzipFileSuffix is a builder-like helper for quickly setting up support for gzip compressed log files
func (c *Config) withGzip() *Config {
c.Compression = "gzip"
return c
}

const mockOperatorType = "mock"

func init() {
Expand Down
66 changes: 66 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package fileconsumer

import (
"compress/gzip"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1528,3 +1529,68 @@ func symlinkTestCreateLogFile(t *testing.T, tempDir string, fileIdx, numLogLines
temp1.Close()
return tokens
}

// TestReadGzipCompressedLogsFromBeginning tests that, when starting from beginning of a gzip compressed file, we
// read all the lines that are already there
func TestReadGzipCompressedLogsFromBeginning(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir).withGzip()
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTempWithPattern(t, tempDir, "*.gz")
writer := gzip.NewWriter(temp)

_, err := writer.Write([]byte("testlog1\ntestlog2\n"))
require.NoError(t, err)

require.NoError(t, writer.Close())

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectToken(t, []byte("testlog1"))
sink.ExpectToken(t, []byte("testlog2"))
}

// TestReadGzipCompressedLogsFromEnd tests that, when starting at the end of a gzip compressed file, we
// read all the lines that are added afterward
func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir).withGzip()
cfg.StartAt = "end"
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTempWithPattern(t, tempDir, "*.gz")

appendToLog := func(t *testing.T, content string) {
writer := gzip.NewWriter(temp)
_, err := writer.Write([]byte(content))
require.NoError(t, err)
require.NoError(t, writer.Close())
}

appendToLog(t, "testlog1\ntestlog2\n")

// poll for the first time - this should not lead to emitted
// logs as those were already in the existing file
operator.poll(context.TODO())

// append new content to the log and poll again - this should be picked up
appendToLog(t, "testlog3\n")
operator.poll(context.TODO())
sink.ExpectToken(t, []byte("testlog3"))

// do another iteration to verify correct setting of compressed reader offset
appendToLog(t, "testlog4\n")
operator.poll(context.TODO())
sink.ExpectToken(t, []byte("testlog4"))
}
5 changes: 4 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Factory struct {
EmitFunc emit.Callback
Attributes attrs.Resolver
DeleteAtEOF bool
Compression string
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
Expand Down Expand Up @@ -73,14 +74,15 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
deleteAtEOF: f.DeleteAtEOF,
compression: f.Compression,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

if r.Fingerprint.Len() > r.fingerprintSize {
// User has reconfigured fingerprint_size
shorter, rereadErr := fingerprint.NewFromFile(file, r.fingerprintSize)
if rereadErr != nil {
return nil, fmt.Errorf("reread fingerprint: %w", err)
return nil, fmt.Errorf("reread fingerprint: %w", rereadErr)
}
if !r.Fingerprint.StartsWith(shorter) {
return nil, errors.New("file truncated")
Expand Down Expand Up @@ -119,5 +121,6 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
for k, v := range attributes {
r.FileAttributes[k] = v
}

return r, nil
}
37 changes: 36 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"compress/gzip"
"context"
"errors"
"io"
"os"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -34,6 +36,7 @@ type Reader struct {
set component.TelemetrySettings
fileName string
file *os.File
reader io.Reader
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
fingerprintSize int
initialBufferSize int
maxLogSize int
Expand All @@ -45,10 +48,42 @@ type Reader struct {
emitFunc emit.Callback
deleteAtEOF bool
needsUpdateFingerprint bool
compression string
}

// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
switch r.compression {
case "gzip":
// We need to create a gzip reader each time ReadToEnd is called because the underlying
// SectionReader can only read a fixed window (from previous offset to EOF).
info, err := r.file.Stat()
if err != nil {
r.set.Logger.Error("Failed to stat", zap.Error(err))
return
}
currentEOF := info.Size()

// use a gzip Reader with an underlying SectionReader to pick up at the last
// offset of a gzip compressed file
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
if err != nil {
if !errors.Is(err, io.EOF) {
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
}
return
} else {
r.reader = gzipReader
}
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
// we need to set the offset to the end of the file.
defer func() {
r.Offset = currentEOF
}()
default:
r.reader = r.file
}

if _, err := r.file.Seek(r.Offset, 0); err != nil {
r.set.Logger.Error("Failed to seek", zap.Error(err))
return
Expand Down Expand Up @@ -154,7 +189,7 @@ func (r *Reader) close() {

// Read from the file and update the fingerprint if necessary
func (r *Reader) Read(dst []byte) (n int, err error) {
n, err = r.file.Read(dst)
n, err = r.reader.Read(dst)
if n == 0 || err != nil {
return
}
Expand Down
Loading