Skip to content

Commit

Permalink
NIFI-11898 Handle commit based on driver capabilities in PutDatabaseR…
Browse files Browse the repository at this point in the history
…ecord

This closes apache#7561

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
mattyb149 authored and exceptionfactory committed Aug 3, 2023
1 parent 3a6f86f commit 63c72bd
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
Expand Down Expand Up @@ -471,10 +472,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
connectionHolder = Optional.of(connection);

originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
if (originalAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver");
}
}

putToDatabase(context, session, flowFile, connection);
connection.commit();
// Only commit the connection if auto-commit is false
if (!originalAutoCommit) {
connection.commit();
}

session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, getJdbcUrl(connection));
Expand All @@ -496,13 +506,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (rollbackOnFailure) {
session.rollback();
} else {
flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": e.getMessage()));
session.transfer(flowFile, relationship);
}

connectionHolder.ifPresent(connection -> {
try {
connection.rollback();
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (final Exception rollbackException) {
getLogger().error("Failed to rollback JDBC transaction", rollbackException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;

import org.apache.commons.dbcp2.DelegatingConnection;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
Expand Down Expand Up @@ -53,6 +55,7 @@
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
Expand All @@ -74,13 +77,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class PutDatabaseRecordTest {

private static String DBCP_SERVICE_ID = "dbcp";

private static final String CONNECTION_FAILED = "Connection Failed";

private static final String PARSER_ID = MockRecordParser.class.getSimpleName();
Expand All @@ -102,7 +108,7 @@ public class PutDatabaseRecordTest {

TestRunner runner;
PutDatabaseRecord processor;
DBCPServiceSimpleImpl dbcp;
DBCPService dbcp;

@BeforeAll
public static void setDatabaseLocation() {
Expand Down Expand Up @@ -143,9 +149,9 @@ public void setRunner() throws Exception {
final Map<String, String> dbcpProperties = new HashMap<>();

runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
}

@Test
Expand All @@ -166,6 +172,42 @@ public void testGetConnectionFailure() throws InitializationException {
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
}

@Test
public void testSetAutoCommitFalseFailure() throws InitializationException, SQLException {
dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);

recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("dt", RecordFieldType.DATE);

LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ

parser.addRecord(1, "rec1", 101, jdbcDate1);

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS);
}

@Test
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException {
// Need to override the @Before method with a new processor that behaves badly
Expand All @@ -176,9 +218,9 @@ public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationEx
final Map<String, String> dbcpProperties = new HashMap<>();

runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);

recreateTable();
final MockRecordParser parser = new MockRecordParser();
Expand Down Expand Up @@ -1766,9 +1808,9 @@ void testInsertWithBlobIntegerArraySource() throws Exception {
void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException {
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, new HashMap<>());
runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
try (Connection conn = dbcp.getConnection()) {
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST");
}
Expand Down Expand Up @@ -1949,4 +1991,28 @@ SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0, 1, 2, 3));
}
}

static class DBCPServiceAutoCommitTest extends AbstractControllerService implements DBCPService {
private final String databaseLocation;

public DBCPServiceAutoCommitTest(final String databaseLocation) {
this.databaseLocation = databaseLocation;
}

@Override
public String getIdentifier() {
return DBCP_SERVICE_ID;
}

@Override
public Connection getConnection() throws ProcessException {
try {
Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"));
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
return spyConnection;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

0 comments on commit 63c72bd

Please sign in to comment.