From 41da139260a33e169d6d876b703e26f88ff89c28 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 6 Oct 2021 20:53:34 +0300 Subject: [PATCH] Fix replay of files with malformed records (#1015) In some cases recorded files contain small amount malformed records. The root cause is not clear, maybe multiple processes writing to the same file. This change ensuer that file can be replayed, and malformed records will be ingored (with meaningful debug message showing exact line in the file with issue). Additionally `--input-file-dry-run` mode was speed up a bit, since there were a few of Sleep statements which were not ignored when dry run is executed. --- input_file.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/input_file.go b/input_file.go index d1e42308..fee47786 100644 --- a/input_file.go +++ b/input_file.go @@ -63,6 +63,8 @@ type fileInputReader struct { s3 bool queue payloadQueue readDepth int + dryRun bool + path string } func (f *fileInputReader) parse(init chan struct{}) error { @@ -70,8 +72,11 @@ func (f *fileInputReader) parse(init chan struct{}) error { var buffer bytes.Buffer var initialized bool + lineNum := 0 + for { line, err := f.reader.ReadBytes('\n') + lineNum++ if err != nil { if err != io.EOF { @@ -92,6 +97,12 @@ func (f *fileInputReader) parse(init chan struct{}) error { asBytes := buffer.Bytes() meta := payloadMeta(asBytes) + if len(meta) < 3 { + Debug(1, fmt.Sprintf("Found malformed record, file: %s, line %d", f.path, lineNum)) + buffer = bytes.Buffer{} + continue + } + timestamp, _ := strconv.ParseInt(string(meta[2]), 10, 64) data := asBytes[:len(asBytes)-1] @@ -112,7 +123,9 @@ func (f *fileInputReader) parse(init chan struct{}) error { initialized = true } - time.Sleep(100 * time.Millisecond) + if !f.dryRun { + time.Sleep(100 * time.Millisecond) + } } buffer = bytes.Buffer{} @@ -133,7 +146,9 @@ func (f *fileInputReader) wait() { return } - time.Sleep(100 * time.Millisecond) + if !f.dryRun { + time.Sleep(100 * time.Millisecond) + } } return @@ -149,7 +164,7 @@ func (f *fileInputReader) Close() error { return nil } -func newFileInputReader(path string, readDepth int) *fileInputReader { +func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReader { var file io.ReadCloser var err error @@ -164,7 +179,7 @@ func newFileInputReader(path string, readDepth int) *fileInputReader { return nil } - r := &fileInputReader{file: file, closed: 0, readDepth: readDepth} + r := &fileInputReader{path: path, file: file, closed: 0, readDepth: readDepth, dryRun: dryRun} if strings.HasSuffix(path, ".gz") { gzReader, err := gzip.NewReader(file) if err != nil { @@ -262,7 +277,7 @@ func (i *FileInput) init() (err error) { i.readers = make([]*fileInputReader, len(matches)) for idx, p := range matches { - i.readers[idx] = newFileInputReader(p, i.readDepth) + i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun) } i.stats.Add("reader_count", int64(len(matches)))