Skip to content

Commit

Permalink
Fix issue 297: incompatibility with AWS Keyspaces (complement) (#308)
Browse files Browse the repository at this point in the history
* Wait for databasechangeloglock table is ready in AWS keyspaces

Complementary fix for issue #297

* Fix SQL syntax for table status check in AWS and add logs in case of error

* Use lowercase table name to correctly match name in AWS system table
  • Loading branch information
maximevw authored Aug 13, 2024
1 parent 379be7c commit cb92f8e
Showing 1 changed file with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import liquibase.exception.UnexpectedLiquibaseException;
import liquibase.executor.Executor;
import liquibase.executor.ExecutorService;
import liquibase.executor.LoggingExecutor;
import liquibase.ext.cassandra.database.CassandraDatabase;
import liquibase.lockservice.StandardLockService;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
Expand Down Expand Up @@ -150,6 +151,15 @@ public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCr
if (!isDatabaseChangeLogLockTableInitialized || forceRecheck) {
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database);

// For AWS Keyspaces, it could be necessary to wait the table is ready in all the nodes of the cluster
// before querying it.
if (isAwsKeyspacesCompatibilityModeEnabled()) {
if (!waitForDatabaseChangeLogLockTableReady(executor)) {
throw new UnexpectedLiquibaseException(
"Waiting for databaseChangeLogLock table ready failed or timed out");
}
}

try {
isDatabaseChangeLogLockTableInitialized = executeCountQuery(executor,
"SELECT COUNT(*) FROM " + getChangeLogLockTableName()) > 0;
Expand Down Expand Up @@ -213,4 +223,50 @@ private int executeCountQuery(final Executor executor, final String query) throw
return executor.queryForInt(new RawSqlStatement(query));
}
}

/**
* Execute a query on the AWS Keyspaces {@code tables} table to check if the database changelog lock table is
* ready for querying (status {@code ACTIVE}).
* <p>
* See: <a href="https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create">
* AWS documentation about creating tables in Keyspaces</a>.
* </p>
*
* @param executor The query executor.
* @return {@code true} if the table is ready before reaching the maximal number of attempts, {@code false}
* otherwise.
*/
private boolean waitForDatabaseChangeLogLockTableReady(final Executor executor) {
int maxAttempts = 20;
final String tableName = database.getDatabaseChangeLogLockTableName();
if (executor instanceof LoggingExecutor) {
return true;
}
boolean isTableActive;
try {
int attempt = 1;
do {
try {
Thread.sleep(getChangeLogLockRecheckTime() * 2000);
} catch (InterruptedException e) {
// Restore thread interrupt status
Thread.currentThread().interrupt();
}
Scope.getCurrentScope().getLog(LockServiceCassandra.class)
.fine("Checking the status of table " + tableName + " (attempt #" + attempt + ")...");
attempt ++;
// Note: the table name must be lowercase to match the name in the table 'system_schema_mcs.tables'
final String tableStatusSqlStatement = "SELECT status FROM system_schema_mcs.tables "
+ "WHERE keyspace_name = '" + database.getLiquibaseCatalogName() + "' "
+ "AND table_name = '" + tableName.toLowerCase() + "'";
String status = executor.queryForObject(new RawSqlStatement(tableStatusSqlStatement), String.class);
isTableActive = "ACTIVE".equalsIgnoreCase(status);
} while (!isTableActive && attempt <= maxAttempts);
} catch (final DatabaseException e) {
Scope.getCurrentScope().getLog(LockServiceCassandra.class)
.warning("Failed to check the status of table " + tableName + " in AWS Keyspaces", e);
return false;
}
return isTableActive;
}
}

0 comments on commit cb92f8e

Please sign in to comment.