Skip to content

Commit

Permalink
TIKA-3085 -- switch to batch inserts in tika-eval
Browse files Browse the repository at this point in the history
  • Loading branch information
tballison committed Apr 8, 2020
1 parent d2a4f2a commit a3fef30
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
30 changes: 30 additions & 0 deletions tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public Set<String> getTables(Connection connection) throws SQLException {
return tables;
}

@Deprecated
/**
* @deprecated use {@link #batchInsert(PreparedStatement, TableInfo, Map)}
*/
public static int insert(PreparedStatement insertStatement,
TableInfo table,
Map<Cols, String> data) throws SQLException {
Expand All @@ -165,6 +169,28 @@ public static int insert(PreparedStatement insertStatement,
}
}

public static void batchInsert(PreparedStatement insertStatement,
TableInfo table,
Map<Cols, String> data) throws SQLException {

try {
int i = 1;
for (ColInfo colInfo : table.getColInfos()) {
updateInsertStatement(i, insertStatement, colInfo, data.get(colInfo.getName()));
i++;
}
for (Cols c : data.keySet()) {
if (!table.containsColumn(c)) {
throw new IllegalArgumentException("Can't add data to " + c +
" because it doesn't exist in the table: " + table.getName());
}
}
insertStatement.addBatch();
} catch (SQLException e) {
LOG.warn("couldn't insert data for this row: {}", e.getMessage());
}
}

public static void updateInsertStatement(int dbColOffset, PreparedStatement st,
ColInfo colInfo, String value) throws SQLException {
if (value == null) {
Expand All @@ -178,9 +204,13 @@ public static void updateInsertStatement(int dbColOffset, PreparedStatement st,
value = value.substring(0, colInfo.getPrecision());
LOG.warn("truncated varchar value in {} : {}", colInfo.getName(), value);
}
//postgres doesn't allow \0000
value = value.replaceAll("\u0000", " ");
st.setString(dbColOffset, value);
break;
case Types.CHAR:
//postgres doesn't allow \0000
value = value.replaceAll("\u0000", " ");
st.setString(dbColOffset, value);
break;
case Types.DOUBLE:
Expand Down
35 changes: 28 additions & 7 deletions tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class DBWriter implements IDBWriter {
private static final Logger LOG = LoggerFactory.getLogger(DBWriter.class);

private static final AtomicInteger WRITER_ID = new AtomicInteger();
private final AtomicLong insertedRows = new AtomicLong();
private final Long commitEveryX = 1000L;
private final Long commitEveryXRows = 10000L;
//private final Long commitEveryXMS = 60000L;

private final Connection conn;
private final JDBCUtil dbUtil;
Expand All @@ -60,7 +60,7 @@ public class DBWriter implements IDBWriter {

//<tableName, preparedStatement>
private final Map<String, PreparedStatement> inserts = new HashMap<>();

private final Map<String, LastInsert> lastInsertMap = new HashMap<>();
public DBWriter(Connection connection, List<TableInfo> tableInfos, JDBCUtil dbUtil, MimeBuffer mimeBuffer)
throws IOException, SQLException {

Expand All @@ -71,6 +71,7 @@ public DBWriter(Connection connection, List<TableInfo> tableInfos, JDBCUtil dbUt
try {
PreparedStatement st = createPreparedInsert(tableInfo);
inserts.put(tableInfo.getName(), st);
lastInsertMap.put(tableInfo.getName(), new LastInsert());
} catch (SQLException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -115,18 +116,33 @@ public void writeRow(TableInfo table, Map<Cols, String> data) throws IOException
throw new RuntimeException("Failed to create prepared statement for: "+
table.getName());
}
dbUtil.insert(p, table, data);
long rows = insertedRows.incrementAndGet();
if (rows % commitEveryX == 0) {
LOG.debug("writer ({}) is committing after {} rows", myId, rows);
dbUtil.batchInsert(p, table, data);
LastInsert lastInsert = lastInsertMap.get(table.getName());
lastInsert.rowCount++;
long elapsed = System.currentTimeMillis()-lastInsert.lastInsert;
if (
//elapsed > commitEveryXMS ||
lastInsert.rowCount % commitEveryXRows == 0) {
LOG.info("writer ({}) on table ({}) is committing after {} rows and {} ms", myId,
table.getName(),
lastInsert.rowCount, elapsed);
p.executeBatch();
conn.commit();
lastInsert.lastInsert = System.currentTimeMillis();
}
} catch (SQLException e) {
throw new IOException(e);
}
}

public void close() throws IOException {
for (PreparedStatement p : inserts.values()) {
try {
p.executeBatch();
} catch (SQLException e) {
throw new IOExceptionWithCause(e);
}
}
try {
conn.commit();
} catch (SQLException e){
Expand All @@ -139,4 +155,9 @@ public void close() throws IOException {
}

}

private class LastInsert {
private long lastInsert = System.currentTimeMillis();
private long rowCount = 0;
}
}

0 comments on commit a3fef30

Please sign in to comment.