Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 297: incompatibility with AWS Keyspaces (complement) #308

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import liquibase.exception.UnexpectedLiquibaseException;
import liquibase.executor.Executor;
import liquibase.executor.ExecutorService;
import liquibase.executor.LoggingExecutor;
import liquibase.ext.cassandra.database.CassandraDatabase;
import liquibase.lockservice.StandardLockService;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
Expand Down Expand Up @@ -150,6 +151,15 @@ public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCr
if (!isDatabaseChangeLogLockTableInitialized || forceRecheck) {
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database);

// For AWS Keyspaces, it could be necessary to wait the table is ready in all the nodes of the cluster
// before querying it.
if (isAwsKeyspacesCompatibilityModeEnabled()) {
if (!waitForDatabaseChangeLogLockTableReady(executor)) {
throw new UnexpectedLiquibaseException(
"Waiting for databaseChangeLogLock table ready failed or timed out");
}
}

try {
isDatabaseChangeLogLockTableInitialized = executeCountQuery(executor,
"SELECT COUNT(*) FROM " + getChangeLogLockTableName()) > 0;
Expand Down Expand Up @@ -213,4 +223,50 @@ private int executeCountQuery(final Executor executor, final String query) throw
return executor.queryForInt(new RawSqlStatement(query));
}
}

/**
* Execute a query on the AWS Keyspaces {@code tables} table to check if the database changelog lock table is
* ready for querying (status {@code ACTIVE}).
* <p>
* See: <a href="https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create">
* AWS documentation about creating tables in Keyspaces</a>.
* </p>
*
* @param executor The query executor.
* @return {@code true} if the table is ready before reaching the maximal number of attempts, {@code false}
* otherwise.
*/
private boolean waitForDatabaseChangeLogLockTableReady(final Executor executor) {
int maxAttempts = 20;
final String tableName = database.getDatabaseChangeLogLockTableName();
if (executor instanceof LoggingExecutor) {
return true;
}
boolean isTableActive;
try {
int attempt = 1;
do {
try {
Thread.sleep(getChangeLogLockRecheckTime() * 2000);
} catch (InterruptedException e) {
// Restore thread interrupt status
Thread.currentThread().interrupt();
}
Scope.getCurrentScope().getLog(LockServiceCassandra.class)
.fine("Checking the status of table " + tableName + " (attempt #" + attempt + ")...");
attempt ++;
// Note: the table name must be lowercase to match the name in the table 'system_schema_mcs.tables'
final String tableStatusSqlStatement = "SELECT status FROM system_schema_mcs.tables "
+ "WHERE keyspace_name = '" + database.getLiquibaseCatalogName() + "' "
+ "AND table_name = '" + tableName.toLowerCase() + "'";
String status = executor.queryForObject(new RawSqlStatement(tableStatusSqlStatement), String.class);
isTableActive = "ACTIVE".equalsIgnoreCase(status);
} while (!isTableActive && attempt <= maxAttempts);
} catch (final DatabaseException e) {
Scope.getCurrentScope().getLog(LockServiceCassandra.class)
.warning("Failed to check the status of table " + tableName + " in AWS Keyspaces", e);
return false;
}
return isTableActive;
}
}
Loading