Skip to content

Commit

Permalink
Fix issue #25 (#26)
Browse files Browse the repository at this point in the history
- fix result sets and statements closing.
- introduce a new behaviour in Liquibase compliance mode to run multiple
queries in the same statement synchronously (by default, they are executed
asynchronously).
- return the schema name instead of null when the method
CassandraConnection.getCatalog() is called in Liquibase compliance mode.
- does not throw SQLFeatureNotSupportedException when
CassandraConnection.rollback() is called in Liquibase compliance mode.
  • Loading branch information
maximevw authored Sep 3, 2023
1 parent 2202725 commit b66035e
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 76 deletions.
18 changes: 9 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ local.properties

# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
.idea/artifacts
.idea/compiler.xml
.idea/jarRepositories.xml
.idea/modules.xml
.idea/*.iml
.idea/modules
*.iml
*.ipr

# CMake
cmake-build-*/
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [4.9.1] - 2023-09-03
### Fixed
- Fix issue [#25](https://github.com/ing-bank/cassandra-jdbc-wrapper/issues/25) causing failure when running with
Liquibase. The fix includes several changes:
- fixes result sets and statements closing.
- introduces a new behaviour in Liquibase compliance mode to run multiple queries in the same statement synchronously
(by default, they are executed asynchronously).
- returns the schema name instead of `null` when the method `CassandraConnection.getCatalog()` is called in Liquibase
compliance mode.
- does not throw `SQLFeatureNotSupportedException` when `CassandraConnection.rollback()` is called in Liquibase
compliance mode.

## [4.9.0] - 2023-04-15
### Added
- Add non-JDBC standard [JSON support](https://cassandra.apache.org/doc/latest/cassandra/cql/json.html) with the
Expand Down Expand Up @@ -121,6 +133,7 @@ For this version, the changelog lists the main changes comparatively to the late
- Fix logs in `CassandraConnection` constructor.

[original project]: https://github.com/adejanovski/cassandra-jdbc-wrapper/
[4.9.1]: https://github.com/ing-bank/cassandra-jdbc-wrapper/compare/v4.9.0...v4.9.1
[4.9.0]: https://github.com/ing-bank/cassandra-jdbc-wrapper/compare/v4.8.0...v4.9.0
[4.8.0]: https://github.com/ing-bank/cassandra-jdbc-wrapper/compare/v4.7.0...v4.8.0
[4.7.0]: https://github.com/ing-bank/cassandra-jdbc-wrapper/compare/v4.6.0...v4.7.0
Expand Down
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ For further information about connecting to DBaaS, see [cloud documentation](htt
### Compliance modes

For some specific usages, the default behaviour of some JDBC implementations has to be modified. That's why you can
use the argument `compliancemode` in the JDBC URL to cutomize the behaviour of some methods.
use the argument `compliancemode` in the JDBC URL to customize the behaviour of some methods.

The values currently allowed for this argument are:
* `Default`: mode activated by default if not specified in the JDBC URL. It implements the methods detailed below as
Expand All @@ -293,10 +293,19 @@ The values currently allowed for this argument are:

Here are the behaviours defined by the compliance modes listed above:

| Method | Default mode | Liquibase mode |
|--------------------------------------------|---------------------------------------------------------------------------------------------------|----------------|
| `CassandraConnection.getCatalog()` | returns the result of the query`SELECT cluster_name FROM system.local` or `null` if not available | returns `null` |
| `CassandraStatement.executeUpdate(String)` | returns 0 | returns -1 |
| Method | Default mode | Liquibase mode |
|--------------------------------------------|---------------------------------------------------------------------------------------------------|--------------------------------------------------------|
| `CassandraConnection.getCatalog()` | returns the result of the query`SELECT cluster_name FROM system.local` or `null` if not available | returns the schema name if available, `null` otherwise |
| `CassandraConnection.rollback()` | throws a `SQLFeatureNotSupportedException` | do nothing more after checking connection is open |
| `CassandraStatement.executeUpdate(String)` | returns 0 | returns -1 |

For the following methods: `CassandraStatement.execute(String)`, `CassandraStatement.executeQuery(String)` and
`CassandraStatement.executeUpdate(String)`, if the CQL statement includes several queries (separated by semicolons),
the behaviour is the following:

| Default mode | Liquibase mode |
|-------------------------------------|------------------------------------|
| executes the queries asynchronously | executes the queries synchronously |

### Using simple statements

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.ing.data</groupId>
<artifactId>cassandra-jdbc-wrapper</artifactId>
<version>4.9.0</version>
<version>4.9.1</version>
<packaging>jar</packaging>

<name>Cassandra JDBC Wrapper</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,9 @@ public CassandraPreparedStatement prepareStatement(final String cql, final int r
@Override
public void rollback() throws SQLException {
checkNotClosed();
throw new SQLFeatureNotSupportedException(ALWAYS_AUTOCOMMIT);
if (this.optionSet.shouldThrowExceptionOnRollback()) {
throw new SQLFeatureNotSupportedException(ALWAYS_AUTOCOMMIT);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public class CassandraMetadataResultSet extends AbstractResultSet implements Cas
private int fetchDirection;
private int fetchSize;
private boolean wasNull;
private boolean isClosed;
// Result set from the Cassandra driver.
private MetadataResultSet driverResultSet;

Expand All @@ -137,6 +138,7 @@ public class CassandraMetadataResultSet extends AbstractResultSet implements Cas
CassandraMetadataResultSet() {
this.metadata = new CResultSetMetaData();
this.statement = null;
this.isClosed = false;
}

/**
Expand All @@ -156,6 +158,7 @@ public class CassandraMetadataResultSet extends AbstractResultSet implements Cas
this.fetchSize = statement.getFetchSize();
this.driverResultSet = metadataResultSet;
this.rowsIterator = metadataResultSet.iterator();
this.isClosed = false;

// Initialize the column values from the first row.
// Note that the first call to next() will harmlessly re-write these values for the columns. The row cursor
Expand Down Expand Up @@ -250,7 +253,7 @@ public void clearWarnings() throws SQLException {
@Override
public void close() throws SQLException {
if (!isClosed()) {
this.statement.close();
this.isClosed = true;
}
}

Expand Down Expand Up @@ -961,10 +964,7 @@ public boolean isBeforeFirst() throws SQLException {

@Override
public boolean isClosed() {
if (this.statement == null) {
return true;
}
return this.statement.isClosed();
return this.isClosed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class CassandraResultSet extends AbstractResultSet
private int fetchDirection;
private int fetchSize;
private boolean wasNull;
private boolean isClosed;
// Result set from the Cassandra driver.
private ResultSet driverResultSet;

Expand All @@ -185,6 +186,7 @@ public class CassandraResultSet extends AbstractResultSet
CassandraResultSet() {
this.metadata = new CResultSetMetaData();
this.statement = null;
this.isClosed = false;
}

/**
Expand All @@ -203,6 +205,7 @@ public class CassandraResultSet extends AbstractResultSet
this.fetchSize = statement.getFetchSize();
this.driverResultSet = resultSet;
this.rowsIterator = resultSet.iterator();
this.isClosed = false;

// Initialize the column values from the first row.
if (hasMoreRows()) {
Expand All @@ -225,6 +228,7 @@ public class CassandraResultSet extends AbstractResultSet
this.resultSetType = statement.getResultSetType();
this.fetchDirection = statement.getFetchDirection();
this.fetchSize = statement.getFetchSize();
this.isClosed = false;

// We have several result sets, but we will use only the first one for metadata needs.
this.driverResultSet = resultSets.get(0);
Expand Down Expand Up @@ -324,7 +328,7 @@ public void clearWarnings() throws SQLException {
@Override
public void close() throws SQLException {
if (!isClosed()) {
this.statement.close();
this.isClosed = true;
}
}

Expand Down Expand Up @@ -1366,10 +1370,7 @@ public boolean isBeforeFirst() throws SQLException {

@Override
public boolean isClosed() {
if (this.statement == null) {
return true;
}
return this.statement.isClosed();
return this.isClosed;
}

@Override
Expand Down
109 changes: 62 additions & 47 deletions src/main/java/com/ing/data/cassandra/jdbc/CassandraStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public class CassandraStatement extends AbstractStatement
* The consistency level used for the statement.
*/
protected ConsistencyLevel consistencyLevel;

private boolean isClosed;
private DriverExecutionProfile customTimeoutProfile;

/**
Expand Down Expand Up @@ -210,6 +210,7 @@ public class CassandraStatement extends AbstractStatement
this.cql = cql;
this.batchQueries = new ArrayList<>();
this.consistencyLevel = connection.getDefaultConsistencyLevel();
this.isClosed = false;

if (!(resultSetType == ResultSet.TYPE_FORWARD_ONLY
|| resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE
Expand Down Expand Up @@ -264,7 +265,7 @@ public void clearWarnings() throws SQLException {

@Override
public void close() {
this.connection = null;
this.isClosed = true;
this.cql = null;
}

Expand All @@ -284,67 +285,68 @@ private void doExecute(final String cql) throws SQLException {

try {
final String[] cqlQueries = cql.split(STATEMENTS_SEPARATOR_REGEX);
if (cqlQueries.length > 1 && !(cql.trim().toLowerCase().startsWith("begin")
if (cqlQueries.length > 1
&& !(cql.trim().toLowerCase().startsWith("begin")
&& cql.toLowerCase().contains("batch") && cql.toLowerCase().contains("apply"))) {
// Several statements in the query to execute asynchronously...

final ArrayList<com.datastax.oss.driver.api.core.cql.ResultSet> results = new ArrayList<>();

// Several statements in the query to execute asynchronously...
if (cqlQueries.length > MAX_ASYNC_QUERIES * 1.1) {
// Protect the cluster from receiving too many queries at once and force the dev to split the load
throw new SQLNonTransientException("Too many queries at once (" + cqlQueries.length
+ "). You must split your queries into more batches !");
}

StringBuilder prevCqlQuery = new StringBuilder();
for (final String cqlQuery : cqlQueries) {
if ((cqlQuery.contains("'") && ((StringUtils.countMatches(cqlQuery, "'") % 2 == 1
&& prevCqlQuery.length() == 0)
|| (StringUtils.countMatches(cqlQuery, "'") % 2 == 0 && prevCqlQuery.length() > 0)))
|| (prevCqlQuery.toString().length() > 0 && !cqlQuery.contains("'"))) {
prevCqlQuery.append(cqlQuery).append(";");
} else {
prevCqlQuery.append(cqlQuery);
if (LOG.isTraceEnabled() || this.connection.isDebugMode()) {
LOG.debug("CQL: {}", prevCqlQuery);
}
SimpleStatement stmt = SimpleStatement.newInstance(prevCqlQuery.toString())
.setConsistencyLevel(this.connection.getDefaultConsistencyLevel())
.setPageSize(this.fetchSize);
if (this.customTimeoutProfile != null) {
stmt = stmt.setExecutionProfile(this.customTimeoutProfile);
// If we should not execute the queries asynchronously, for example if they must be executed in the
// specified order (e.g. in Liquibase scripts with queries such as CREATE TABLE t, then
// INSERT INTO t ...).
if (!this.connection.getOptionSet().executeMultipleQueriesByStatementAsync()) {
for (final String cqlQuery : cqlQueries) {
final com.datastax.oss.driver.api.core.cql.ResultSet rs = executeSingleStatement(cqlQuery);
results.add(rs);
}
} else {
StringBuilder prevCqlQuery = new StringBuilder();
for (final String cqlQuery : cqlQueries) {
if ((cqlQuery.contains("'") && ((StringUtils.countMatches(cqlQuery, "'") % 2 == 1
&& prevCqlQuery.length() == 0)
|| (StringUtils.countMatches(cqlQuery, "'") % 2 == 0 && prevCqlQuery.length() > 0)))
|| (!prevCqlQuery.toString().isEmpty() && !cqlQuery.contains("'"))) {
prevCqlQuery.append(cqlQuery).append(";");
} else {
prevCqlQuery.append(cqlQuery);
if (LOG.isTraceEnabled() || this.connection.isDebugMode()) {
LOG.debug("CQL: {}", prevCqlQuery);
}
SimpleStatement stmt = SimpleStatement.newInstance(prevCqlQuery.toString())
.setConsistencyLevel(this.connection.getDefaultConsistencyLevel())
.setPageSize(this.fetchSize);
if (this.customTimeoutProfile != null) {
stmt = stmt.setExecutionProfile(this.customTimeoutProfile);
}
final CompletionStage<AsyncResultSet> resultSetFuture =
((CqlSession) this.connection.getSession()).executeAsync(stmt);
futures.add(resultSetFuture);
prevCqlQuery = new StringBuilder();
}
final CompletionStage<AsyncResultSet> resultSetFuture =
((CqlSession) this.connection.getSession()).executeAsync(stmt);
futures.add(resultSetFuture);
prevCqlQuery = new StringBuilder();
}
}

for (final CompletionStage<AsyncResultSet> future : futures) {
final AsyncResultSet asyncResultSet = CompletableFutures.getUninterruptibly(future);
final com.datastax.oss.driver.api.core.cql.ResultSet rows;
if (asyncResultSet.hasMorePages()) {
rows = new MultiPageResultSet(asyncResultSet);
} else {
rows = new SinglePageResultSet(asyncResultSet);
for (final CompletionStage<AsyncResultSet> future : futures) {
final AsyncResultSet asyncResultSet = CompletableFutures.getUninterruptibly(future);
final com.datastax.oss.driver.api.core.cql.ResultSet rows;
if (asyncResultSet.hasMorePages()) {
rows = new MultiPageResultSet(asyncResultSet);
} else {
rows = new SinglePageResultSet(asyncResultSet);
}
results.add(rows);
}
results.add(rows);
}

this.currentResultSet = new CassandraResultSet(this, results);
} else {
// Only one statement to execute, so do it synchronously.
if (LOG.isTraceEnabled() || this.connection.isDebugMode()) {
LOG.debug("CQL: " + cql);
}
SimpleStatement stmt = SimpleStatement.newInstance(cql)
.setConsistencyLevel(this.connection.getDefaultConsistencyLevel())
.setPageSize(this.fetchSize);
if (this.customTimeoutProfile != null) {
stmt = stmt.setExecutionProfile(this.customTimeoutProfile);
}
this.currentResultSet = new CassandraResultSet(this,
((CqlSession) this.connection.getSession()).execute(stmt));
this.currentResultSet = new CassandraResultSet(this, executeSingleStatement(cql));
}
} catch (final Exception e) {
for (final CompletionStage<AsyncResultSet> future : futures) {
Expand All @@ -354,6 +356,19 @@ private void doExecute(final String cql) throws SQLException {
}
}

private com.datastax.oss.driver.api.core.cql.ResultSet executeSingleStatement(final String cql) {
if (LOG.isTraceEnabled() || this.connection.isDebugMode()) {
LOG.debug("CQL: " + cql);
}
SimpleStatement stmt = SimpleStatement.newInstance(cql)
.setConsistencyLevel(this.connection.getDefaultConsistencyLevel())
.setPageSize(this.fetchSize);
if (this.customTimeoutProfile != null) {
stmt = stmt.setExecutionProfile(this.customTimeoutProfile);
}
return ((CqlSession) this.connection.getSession()).execute(stmt);
}

@Override
public boolean execute(final String query) throws SQLException {
checkNotClosed();
Expand Down Expand Up @@ -647,7 +662,7 @@ public SQLWarning getWarnings() throws SQLException {

@Override
public boolean isClosed() {
return this.connection == null;
return this.isClosed;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,13 @@ public int getSQLUpdateResponse() {
return 0;
}

@Override
public boolean shouldThrowExceptionOnRollback() {
return true;
}

@Override
public boolean executeMultipleQueriesByStatementAsync() {
return true;
}
}
Loading

0 comments on commit b66035e

Please sign in to comment.