Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move gzip internal reader creation into ReadToEnd #2

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"time"

Expand Down Expand Up @@ -76,6 +74,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
deleteAtEOF: f.DeleteAtEOF,
compression: f.Compression,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand Down Expand Up @@ -123,30 +122,5 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
r.FileAttributes[k] = v
}

switch f.Compression {
case "gzip":
info, err := r.file.Stat()
if err != nil {
return nil, fmt.Errorf("gzip stat: %w", err)
}

// use a gzip Reader with an underlying SectionReader to pick up at the last
// offset of a gzip compressed file
gzipReader, err := gzip.NewReader(io.NewSectionReader(file, r.Offset, info.Size()))
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("new gzip: %w", err)
}
} else {
r.reader = gzipReader
}
r.deferFunc = func() {
// set the offset of the reader to the end of the file to ensure the offset is not set to the
// position of the scanner, which operates on the uncompressed data
r.Offset = info.Size()
}
default:
r.reader = file
}
return r, nil
}
38 changes: 31 additions & 7 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"compress/gzip"
"context"
"errors"
"io"
Expand Down Expand Up @@ -41,20 +42,46 @@ type Reader struct {
maxLogSize int
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
deferFunc func()
decoder *decode.Decoder
headerReader *header.Reader
processFunc emit.Callback
emitFunc emit.Callback
deleteAtEOF bool
needsUpdateFingerprint bool
compression string
}

// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
if r.reader == nil {
// Nothing to do. This can happen if it was already determined that we're at EOF.
return
switch r.compression {
case "gzip":
// We need to create a gzip reader each time ReadToEnd is called because the underlying
// SectionReader can only read a fixed window (from previous offset to EOF).
info, err := r.file.Stat()
if err != nil {
r.set.Logger.Error("Failed to stat", zap.Error(err))
return
}
currentEOF := info.Size()

// use a gzip Reader with an underlying SectionReader to pick up at the last
// offset of a gzip compressed file
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
if err != nil {
if !errors.Is(err, io.EOF) {
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
}
return
} else {
r.reader = gzipReader
}
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
// we need to set the offset to the end of the file.
defer func() {
r.Offset = currentEOF
}()
default:
r.reader = r.file
}

if _, err := r.file.Seek(r.Offset, 0); err != nil {
Expand All @@ -66,9 +93,6 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if r.needsUpdateFingerprint {
r.updateFingerprint()
}
if r.deferFunc != nil {
r.deferFunc()
}
}()

s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
Expand Down