Skip to content

Commit

Permalink
Fix replay of files with malformed records (#1015)
Browse files Browse the repository at this point in the history
In some cases recorded files contain small amount malformed records.
The root cause is not clear, maybe multiple processes writing to the same file.

This change ensuer that file can be replayed, and malformed records will be ingored (with meaningful debug message showing exact line in the file with issue).

Additionally `--input-file-dry-run` mode was speed up a bit, since there were a few of Sleep statements which were not ignored when dry run is executed.
  • Loading branch information
buger committed Oct 6, 2021
1 parent 80947b0 commit 41da139
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,20 @@ type fileInputReader struct {
s3 bool
queue payloadQueue
readDepth int
dryRun bool
path string
}

func (f *fileInputReader) parse(init chan struct{}) error {
payloadSeparatorAsBytes := []byte(payloadSeparator)
var buffer bytes.Buffer
var initialized bool

lineNum := 0

for {
line, err := f.reader.ReadBytes('\n')
lineNum++

if err != nil {
if err != io.EOF {
Expand All @@ -92,6 +97,12 @@ func (f *fileInputReader) parse(init chan struct{}) error {
asBytes := buffer.Bytes()
meta := payloadMeta(asBytes)

if len(meta) < 3 {
Debug(1, fmt.Sprintf("Found malformed record, file: %s, line %d", f.path, lineNum))
buffer = bytes.Buffer{}
continue
}

timestamp, _ := strconv.ParseInt(string(meta[2]), 10, 64)
data := asBytes[:len(asBytes)-1]

Expand All @@ -112,7 +123,9 @@ func (f *fileInputReader) parse(init chan struct{}) error {
initialized = true
}

time.Sleep(100 * time.Millisecond)
if !f.dryRun {
time.Sleep(100 * time.Millisecond)
}
}

buffer = bytes.Buffer{}
Expand All @@ -133,7 +146,9 @@ func (f *fileInputReader) wait() {
return
}

time.Sleep(100 * time.Millisecond)
if !f.dryRun {
time.Sleep(100 * time.Millisecond)
}
}

return
Expand All @@ -149,7 +164,7 @@ func (f *fileInputReader) Close() error {
return nil
}

func newFileInputReader(path string, readDepth int) *fileInputReader {
func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReader {
var file io.ReadCloser
var err error

Expand All @@ -164,7 +179,7 @@ func newFileInputReader(path string, readDepth int) *fileInputReader {
return nil
}

r := &fileInputReader{file: file, closed: 0, readDepth: readDepth}
r := &fileInputReader{path: path, file: file, closed: 0, readDepth: readDepth, dryRun: dryRun}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
Expand Down Expand Up @@ -262,7 +277,7 @@ func (i *FileInput) init() (err error) {
i.readers = make([]*fileInputReader, len(matches))

for idx, p := range matches {
i.readers[idx] = newFileInputReader(p, i.readDepth)
i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun)
}

i.stats.Add("reader_count", int64(len(matches)))
Expand Down

0 comments on commit 41da139

Please sign in to comment.