Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect skipping of records when reprocessing file #190

Open
chipps opened this issue Sep 9, 2021 · 2 comments
Open

Incorrect skipping of records when reprocessing file #190

chipps opened this issue Sep 9, 2021 · 2 comments

Comments

@chipps
Copy link

chipps commented Sep 9, 2021

The CSV connector is incorrectly under-skipping records when reprocessing a file.
Assume there is a CSV with 1 header record and 10 data records named EMPS.csv
When the connector processes the file the first time, 10 records are properly inserted into the topic.
When the same file is copied back for reprocessing, in theory it should NOT add any more records but it inserts a single record, the 10th record into the topic again. SpoolDirCsvSourceTask gets the offset of 10 but it only actually skips 8 records when it should skip 9 to work.

The code from the java file lines 71-76 are furnished below.

if (null != lastOffset) {
  log.info("Found previous offset. Skipping {} line(s).", lastOffset.intValue());
  String[] row = null;
  while (null != (row = this.csvReader.readNext()) && this.csvReader.getLinesRead() < lastOffset) {
    log.trace("skipped row");
  }
}

I think line 75 should be changed to use <= rather than <

  while (null != (row = this.csvReader.readNext()) && this.csvReader.getLinesRead() <= lastOffset) {

Here below is a program that demonstrates that the getLinesRead() is 2 records behind readNext()


neil@kaf-vbox:~/code$ cat *.java

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.opencsv.CSVReader;

/**

  • OpenCSV CSVReader Example, Read line by line

*/
public class OpenCSVReaderLineByLineExample {

public static void main(String[] args) throws IOException {

CSVReader reader = new CSVReader(new FileReader("emps.csv"), ',');
int offset = 10;

// read line by line
String[] record = null;

record = reader.readNext();
System.out.println( record[0] + " HD " + reader.getLinesRead() + " (getLinesRead)" );

while ( ( (record = reader.readNext()) != null ) && ( reader.getLinesRead() < offset ) ) {
System.out.println( record[0] + " -- " + reader.getLinesRead() );
}

reader.close();
}

}


Here is the emps.csv file


Header
1
2
3
4
5
6
7
8
9
10

Here is the output of running when using the <


Header HD 1 (getLinesRead)
1 -- 2
2 -- 3
3 -- 4
4 -- 5
5 -- 6
6 -- 7
7 -- 8
8 -- 9

As you can see, the loop ended when getLinesRead returned 10 but at that time readNext() had finished reading the 9th record, thereby there will be ONE more record ready to be read by readNext() .

This is the problem showing up in SpoolDirCsvSourceTask when reprocessing a file.

If you replace < with <=, then when getLinesRead goes to 10, it will have skipped 9 records but when getLinesRead hits 11, the loop break occurs, thereby ensuring that the 10th record has already been read by readNext() and therefore it skipped 10 records.

The code as it stands right now in SpoolDirCsvSourceTask kind of works for the case where the number of data records is only 1.

-- Neil

@Bohatman
Copy link

also happen when you setting Skip line if set more than 1 it re-process more than last row

@sidbose87
Copy link

I am figuring this out as well, I am not a pro, but for some reason. Even when I changed the topic name and restart the connector, it is processing the 10th Record or for that matter for all files i push it is just loading last record in the topic. Any suggestions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants