Skip to content

Commit

Permalink
Merge pull request #2 from djaglowski/gzip-reader-readtoend
Browse files Browse the repository at this point in the history
Move gzip internal reader creation into ReadToEnd
  • Loading branch information
bacherfl authored Jun 14, 2024
2 parents 0b403a8 + bb6708b commit 1bd6f0b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
28 changes: 1 addition & 27 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"time"

Expand Down Expand Up @@ -76,6 +74,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
deleteAtEOF: f.DeleteAtEOF,
compression: f.Compression,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand Down Expand Up @@ -123,30 +122,5 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
r.FileAttributes[k] = v
}

switch f.Compression {
case "gzip":
info, err := r.file.Stat()
if err != nil {
return nil, fmt.Errorf("gzip stat: %w", err)
}

// use a gzip Reader with an underlying SectionReader to pick up at the last
// offset of a gzip compressed file
gzipReader, err := gzip.NewReader(io.NewSectionReader(file, r.Offset, info.Size()))
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("new gzip: %w", err)
}
} else {
r.reader = gzipReader
}
r.deferFunc = func() {
// set the offset of the reader to the end of the file to ensure the offset is not set to the
// position of the scanner, which operates on the uncompressed data
r.Offset = info.Size()
}
default:
r.reader = file
}
return r, nil
}
38 changes: 31 additions & 7 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"compress/gzip"
"context"
"errors"
"io"
Expand Down Expand Up @@ -41,20 +42,46 @@ type Reader struct {
maxLogSize int
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
deferFunc func()
decoder *decode.Decoder
headerReader *header.Reader
processFunc emit.Callback
emitFunc emit.Callback
deleteAtEOF bool
needsUpdateFingerprint bool
compression string
}

// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
if r.reader == nil {
// Nothing to do. This can happen if it was already determined that we're at EOF.
return
switch r.compression {
case "gzip":
// We need to create a gzip reader each time ReadToEnd is called because the underlying
// SectionReader can only read a fixed window (from previous offset to EOF).
info, err := r.file.Stat()
if err != nil {
r.set.Logger.Error("Failed to stat", zap.Error(err))
return
}
currentEOF := info.Size()

// use a gzip Reader with an underlying SectionReader to pick up at the last
// offset of a gzip compressed file
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
if err != nil {
if !errors.Is(err, io.EOF) {
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
}
return
} else {
r.reader = gzipReader
}
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
// we need to set the offset to the end of the file.
defer func() {
r.Offset = currentEOF
}()
default:
r.reader = r.file
}

if _, err := r.file.Seek(r.Offset, 0); err != nil {
Expand All @@ -66,9 +93,6 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if r.needsUpdateFingerprint {
r.updateFingerprint()
}
if r.deferFunc != nil {
r.deferFunc()
}
}()

s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
Expand Down

0 comments on commit 1bd6f0b

Please sign in to comment.