Skip to content

Commit

Permalink
feat: Migration framework (#1768)
Browse files Browse the repository at this point in the history
This PR implemented the migration framework, which contains the command
line interface to execute migration and helps to migrate data easily.
Please read README.md for more information about this framework.
  • Loading branch information
cindyyan317 authored Dec 17, 2024
1 parent 15a441b commit 8dc7f16
Show file tree
Hide file tree
Showing 69 changed files with 4,395 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ add_subdirectory(etlng)
add_subdirectory(feed)
add_subdirectory(rpc)
add_subdirectory(web)
add_subdirectory(migration)
add_subdirectory(app)
add_subdirectory(main)
2 changes: 1 addition & 1 deletion src/app/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp WebHandlers.cpp)

target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc)
target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)
10 changes: 10 additions & 0 deletions src/app/CliArgs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "app/CliArgs.hpp"

#include "migration/MigrationApplication.hpp"
#include "util/build/Build.hpp"

#include <boost/program_options/options_description.hpp>
Expand All @@ -45,6 +46,7 @@ CliArgs::parse(int argc, char const* argv[])
("version,v", "print version and exit")
("conf,c", po::value<std::string>()->default_value(defaultConfigPath), "configuration file")
("ng-web-server,w", "Use ng-web-server")
("migrate", po::value<std::string>(), "start migration helper")
;
// clang-format on
po::positional_options_description positional;
Expand All @@ -65,6 +67,14 @@ CliArgs::parse(int argc, char const* argv[])
}

auto configPath = parsed["conf"].as<std::string>();

if (parsed.count("migrate") != 0u) {
auto const opt = parsed["migrate"].as<std::string>();
if (opt == "status")
return Action{Action::Migrate{std::move(configPath), MigrateSubCmd::status()}};
return Action{Action::Migrate{std::move(configPath), MigrateSubCmd::migration(opt)}};
}

return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0}
};
}
Expand Down
12 changes: 10 additions & 2 deletions src/app/CliArgs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "migration/MigrationApplication.hpp"
#include "util/OverloadSet.hpp"

#include <string>
Expand Down Expand Up @@ -52,13 +53,20 @@ class CliArgs {
int exitCode; ///< Exit code.
};

/** @brief Migration action. */
struct Migrate {
std::string configPath;
MigrateSubCmd subCmd;
};

/**
* @brief Construct an action from a Run.
*
* @param action Run action.
*/
template <typename ActionType>
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit>
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit> or
std::is_same_v<ActionType, Migrate>
explicit Action(ActionType&& action) : action_(std::forward<ActionType>(action))
{
}
Expand All @@ -78,7 +86,7 @@ class CliArgs {
}

private:
std::variant<Run, Exit> action_;
std::variant<Run, Exit, Migrate> action_;
};

/**
Expand Down
19 changes: 19 additions & 0 deletions src/data/BackendInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,16 @@ class BackendInterface {
boost::asio::yield_context yield
) const;

/**
* @brief Fetches the status of migrator by name.
*
* @param migratorName The name of the migrator
* @param yield The coroutine context
* @return The status of the migrator if found; nullopt otherwise
*/
virtual std::optional<std::string>
fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const = 0;

/**
* @brief Synchronously fetches the ledger range from DB.
*
Expand Down Expand Up @@ -673,6 +683,15 @@ class BackendInterface {
bool
finishWrites(std::uint32_t ledgerSequence);

/**
* @brief Mark the migration status of a migrator as Migrated in the database
*
* @param migratorName The name of the migrator
* @param status The status to set
*/
virtual void
writeMigratorStatus(std::string const& migratorName, std::string const& status) = 0;

/**
* @return true if database is overwhelmed; false otherwise
*/
Expand Down
34 changes: 32 additions & 2 deletions src/data/CassandraBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ class BasicCassandraBackend : public BackendInterface {

SettingsProviderType settingsProvider_;
Schema<SettingsProviderType> schema_;

std::atomic_uint32_t ledgerSequence_ = 0u;

protected:
Handle handle_;

// have to be mutable because BackendInterface constness :(
mutable ExecutionStrategyType executor_;

std::atomic_uint32_t ledgerSequence_ = 0u;

public:
/**
* @brief Create a new cassandra/scylla backend instance.
Expand Down Expand Up @@ -835,6 +837,26 @@ class BasicCassandraBackend : public BackendInterface {
return results;
}

std::optional<std::string>
fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const override
{
auto const res = executor_.read(yield, schema_->selectMigratorStatus, Text(migratorName));
if (not res) {
LOG(log_.error()) << "Could not fetch migrator status: " << res.error();
return {};
}

auto const& results = res.value();
if (not results) {
return {};
}

for (auto [statusString] : extract<std::string>(results))
return statusString;

return {};
}

void
doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
{
Expand Down Expand Up @@ -962,6 +984,14 @@ class BasicCassandraBackend : public BackendInterface {
// probably was used in PG to start a transaction or smth.
}

void
writeMigratorStatus(std::string const& migratorName, std::string const& status) override
{
executor_.writeSync(
schema_->insertMigratorStatus, data::cassandra::Text{migratorName}, data::cassandra::Text(status)
);
}

bool
isTooBusy() const override
{
Expand Down
12 changes: 12 additions & 0 deletions src/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,15 @@ CREATE TABLE clio.nf_token_transactions (
```

The `nf_token_transactions` table serves as the NFT counterpart to `account_tx`, inspired by the same motivations and fulfilling a similar role within this context. It drives the `nft_history` API.

### migrator_status

```
CREATE TABLE clio.migrator_status (
migrator_name TEXT, # The name of the migrator
status TEXT, # The status of the migrator
PRIMARY KEY (migrator_name)
)
```

The `migrator_status` table stores the status of the migratior in this database. If a migrator's status is `migrated`, it means this database has finished data migration for this migrator.
34 changes: 34 additions & 0 deletions src/data/cassandra/Schema.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ class Schema {
qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
));

statements.emplace_back(fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
migrator_name TEXT,
status TEXT,
PRIMARY KEY (migrator_name)
)
)",
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));

return statements;
}();

Expand Down Expand Up @@ -466,6 +478,17 @@ class Schema {
));
}();

PreparedStatement insertMigratorStatus = [this]() {
return handle_.get().prepare(fmt::format(
R"(
INSERT INTO {}
(migrator_name, status)
VALUES (?, ?)
)",
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));
}();

//
// Select queries
//
Expand Down Expand Up @@ -768,6 +791,17 @@ class Schema {
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));
}();

PreparedStatement selectMigratorStatus = [this]() {
return handle_.get().prepare(fmt::format(
R"(
SELECT status
FROM {}
WHERE migrator_name = ?
)",
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));
}();
};

/**
Expand Down
1 change: 0 additions & 1 deletion src/data/cassandra/SettingsProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "util/Constants.hpp"
#include "util/newconfig/ObjectView.hpp"


#include <cerrno>
#include <chrono>
#include <cstddef>
Expand Down
22 changes: 22 additions & 0 deletions src/data/cassandra/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <cstdint>
#include <expected>
#include <string>
#include <utility>

namespace data::cassandra {

Expand Down Expand Up @@ -55,6 +57,26 @@ struct Limit {
int32_t limit;
};

/**
* @brief A strong type wrapper for string
*
* This is unfortunately needed right now to support TEXT properly
* because clio uses string to represent BLOB
* If we want to bind TEXT with string, we need to use this type
*/
struct Text {
std::string text;

/**
* @brief Construct a new Text object from string type
*
* @param text The text to wrap
*/
explicit Text(std::string text) : text{std::move(text)}
{
}
};

class Handle;
class CassandraError;

Expand Down
3 changes: 3 additions & 0 deletions src/data/cassandra/impl/Statement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class Statement : public ManagedObject<CassStatement> {
// reinterpret_cast is needed here :'(
auto const rc = bindBytes(reinterpret_cast<unsigned char const*>(value.data()), value.size());
throwErrorIfNeeded(rc, "Bind string (as bytes)");
} else if constexpr (std::is_convertible_v<DecayedType, Text>) {
auto const rc = cass_statement_bind_string_n(*this, idx, value.text.c_str(), value.text.size());
throwErrorIfNeeded(rc, "Bind string (as TEXT)");
} else if constexpr (std::is_same_v<DecayedType, UintTupleType> ||
std::is_same_v<DecayedType, UintByteTupleType>) {
auto const rc = cass_statement_bind_tuple(*this, idx, Tuple{std::forward<Type>(value)});
Expand Down
18 changes: 18 additions & 0 deletions src/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

#include "app/CliArgs.hpp"
#include "app/ClioApplication.hpp"
#include "migration/MigrationApplication.hpp"
#include "rpc/common/impl/HandlerProvider.hpp"
#include "util/TerminationHandler.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigFileJson.hpp"
Expand Down Expand Up @@ -54,6 +56,22 @@ try {
util::LogService::init(ClioConfig);
app::ClioApplication clio{ClioConfig};
return clio.run(run.useNgWebServer);
},
[](app::CliArgs::Action::Migrate const& migrate) {
auto const json = ConfigFileJson::make_ConfigFileJson(migrate.configPath);
if (!json.has_value()) {
std::cerr << json.error().error << std::endl;
return EXIT_FAILURE;
}
auto const errors = ClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value())
std::cerr << err.error << std::endl;
return EXIT_FAILURE;
}
util::LogService::init(ClioConfig);
app::MigratorApplication migrator{ClioConfig, migrate.subCmd};
return migrator.run();
}
);
} catch (std::exception const& e) {
Expand Down
8 changes: 8 additions & 0 deletions src/migration/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(clio_migration)

target_sources(
clio_migration PRIVATE MigrationApplication.cpp impl/MigrationManagerFactory.cpp MigratorStatus.cpp
cassandra/impl/ObjectsAdapter.cpp cassandra/impl/TransactionsAdapter.cpp
)

target_link_libraries(clio_migration PRIVATE clio_util clio_etl)
Loading

0 comments on commit 8dc7f16

Please sign in to comment.