Skip to content

Commit

Permalink
Merge pull request #2 from apple/master
Browse files Browse the repository at this point in the history
Merge upstream
  • Loading branch information
Daniel Smith authored Feb 7, 2020
2 parents adbb1b5 + 2d4f24a commit 7aebe05
Show file tree
Hide file tree
Showing 34 changed files with 772 additions and 409 deletions.
15 changes: 7 additions & 8 deletions bindings/c/fdb_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ int g_api_version = 0;
// Legacy (pre API version 610)
#define CLUSTER(c) ((char*)c)

/*
* While we could just use the MultiVersionApi instance directly, this #define allows us to swap in any other IClientApi instance (e.g. from ThreadSafeApi)
/*
* While we could just use the MultiVersionApi instance directly, this #define allows us to swap in any other IClientApi
* instance (e.g. from ThreadSafeApi)
*/
#define API ((IClientApi*)MultiVersionApi::api)

Expand Down Expand Up @@ -74,12 +75,10 @@ fdb_bool_t fdb_error_predicate( int predicate_test, fdb_error_t code ) {
code == error_code_cluster_version_changed;
}
if(predicate_test == FDBErrorPredicates::RETRYABLE_NOT_COMMITTED) {
return code == error_code_not_committed ||
code == error_code_transaction_too_old ||
code == error_code_future_version ||
code == error_code_database_locked ||
code == error_code_proxy_memory_limit_exceeded ||
code == error_code_process_behind;
return code == error_code_not_committed || code == error_code_transaction_too_old ||
code == error_code_future_version || code == error_code_database_locked ||
code == error_code_proxy_memory_limit_exceeded || code == error_code_batch_transaction_throttled ||
code == error_code_process_behind;
}
return false;
}
Expand Down
1 change: 1 addition & 0 deletions documentation/sphinx/source/mr-status-json-schemas.rst.inc
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"backup_worker_enabled":1,
"proxies":5 // this field will be absent if a value has not been explicitly set
},
"data":{
Expand Down
6 changes: 4 additions & 2 deletions fdbbackup/FileConverter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "fdbclient/MutationList.h"
#include "flow/flow.h"
#include "flow/serialize.h"
#include "flow/actorcompiler.h" // has to be last include

namespace file_converter {

Expand Down Expand Up @@ -315,10 +316,11 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg

if (!fp->mutations.empty() && fp->mutations.back().version.version >= minVersion) return Void();

state int64_t len;
try {
// Read block by block until we see the minVersion
loop {
state int64_t len = std::min<int64_t>(file.blockSize, file.fileSize - fp->offset);
len = std::min<int64_t>(file.blockSize, file.fileSize - fp->offset);
if (len == 0) {
fp->eof = true;
return Void();
Expand Down Expand Up @@ -593,4 +595,4 @@ int main(int argc, char** argv) {
TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what());
return FDB_EXIT_MAIN_EXCEPTION;
}
}
}
3 changes: 2 additions & 1 deletion fdbbackup/FileDecoder.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "fdbclient/MutationList.h"
#include "flow/flow.h"
#include "flow/serialize.h"
#include "flow/actorcompiler.h" // has to be last include

namespace file_converter {

Expand Down Expand Up @@ -437,4 +438,4 @@ int main(int argc, char** argv) {
TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what());
return FDB_EXIT_MAIN_EXCEPTION;
}
}
}
5 changes: 5 additions & 0 deletions fdbclient/BackupAgent.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,11 @@ class BackupConfig : public KeyBackedConfig {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}

// Set to true when all backup workers for saving mutation logs have been started.
KeyBackedProperty<bool> allWorkerStarted() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}

// Stop differntial logging if already started or don't start after completing KV ranges
KeyBackedProperty<bool> stopWhenDone() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
Expand Down
20 changes: 12 additions & 8 deletions fdbclient/BackupContainer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,19 @@ class BackupContainerFileSystem : public IBackupContainer {
}

// The innermost folder covers 100,000 seconds (1e11 versions) which is 5,000 mutation log files at current settings.
static std::string logVersionFolderString(Version v) {
return format("logs/%s/", versionFolderString(v, 11).c_str());
static std::string logVersionFolderString(Version v, bool mlogs) {
return format("%s/%s/", (mlogs ? "mlogs" : "logs"), versionFolderString(v, 11).c_str());
}

Future<Reference<IBackupFile>> writeLogFile(Version beginVersion, Version endVersion, int blockSize) override {
return writeFile(logVersionFolderString(beginVersion) + format("log,%lld,%lld,%s,%d", beginVersion, endVersion, deterministicRandom()->randomUniqueID().toString().c_str(), blockSize));
return writeFile(logVersionFolderString(beginVersion, false) +
format("log,%lld,%lld,%s,%d", beginVersion, endVersion,
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize));
}

Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
uint16_t tagId) override {
return writeFile(logVersionFolderString(beginVersion) +
return writeFile(logVersionFolderString(beginVersion, true) +
format("log,%lld,%lld,%s,%d,%d", beginVersion, endVersion,
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId));
}
Expand Down Expand Up @@ -517,10 +519,12 @@ class BackupContainerFileSystem : public IBackupContainer {
// so start at an earlier version adjusted by how many versions a file could contain.
//
// Get the cleaned (without slashes) first and last folders that could contain relevant results.
std::string firstPath = cleanFolderString(logVersionFolderString(
std::max<Version>(0, beginVersion - CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE)
));
std::string lastPath = cleanFolderString(logVersionFolderString(targetVersion));
bool mlogs = false; // tagged mutation logs
std::string firstPath = cleanFolderString(
logVersionFolderString(std::max<Version>(0, beginVersion - CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES *
CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE),
mlogs));
std::string lastPath = cleanFolderString(logVersionFolderString(targetVersion, mlogs));

std::function<bool(std::string const &)> pathFilter = [=](const std::string &folderPath) {
// Remove slashes in the given folder path so that the '/' positions in the version folder string do not matter
Expand Down
4 changes: 4 additions & 0 deletions fdbclient/DatabaseConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void DatabaseConfiguration::resetInternal() {
tLogPolicy = storagePolicy = remoteTLogPolicy = Reference<IReplicationPolicy>();
remoteDesiredTLogCount = -1;
remoteTLogReplicationFactor = repopulateRegionAntiQuorum = 0;
backupWorkerEnabled = false;
}

void parse( int* i, ValueRef const& v ) {
Expand Down Expand Up @@ -322,6 +323,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
if (autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS) {
result["auto_logs"] = autoDesiredTLogCount;
}

result["backup_worker_enabled"] = (int32_t)backupWorkerEnabled;
}

return result;
Expand Down Expand Up @@ -434,6 +437,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value);
else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value);
else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
else if (ck == LiteralStringRef("backup_worker_enabled")) { parse((&type), value); backupWorkerEnabled = (type != 0); }
else if (ck == LiteralStringRef("usable_regions")) parse(&usableRegions, value);
else if (ck == LiteralStringRef("repopulate_anti_quorum")) parse(&repopulateRegionAntiQuorum, value);
else if (ck == LiteralStringRef("regions")) parse(&regions, value);
Expand Down
3 changes: 3 additions & 0 deletions fdbclient/DatabaseConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ struct DatabaseConfiguration {
int32_t remoteTLogReplicationFactor;
Reference<IReplicationPolicy> remoteTLogPolicy;

// Backup Workers
bool backupWorkerEnabled;

//Data centers
int32_t usableRegions;
int32_t repopulateRegionAntiQuorum;
Expand Down
1 change: 1 addition & 0 deletions fdbclient/DatabaseContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAll
Counter transactionsMaybeCommitted;
Counter transactionsResourceConstrained;
Counter transactionsProcessBehind;
Counter transactionsThrottled;

ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;

Expand Down
90 changes: 83 additions & 7 deletions fdbclient/FileBackupAgent.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/JsonBuilder.h"

Expand Down Expand Up @@ -899,6 +900,29 @@ namespace fileBackup {
return LiteralStringRef("OnSetAddTask");
}

// Clears the backup ID from "backupStartedKey" to pause backup workers.
ACTOR static Future<Void> clearBackupStartID(Reference<ReadYourWritesTransaction> tr, UID backupUid) {
// If backup worker is not enabled, exit early.
Optional<Value> started = wait(tr->get(backupStartedKey));
std::vector<std::pair<UID, Version>> ids;
if (started.present()) {
ids = decodeBackupStartedValue(started.get());
}
auto it = std::find_if(ids.begin(), ids.end(),
[=](const std::pair<UID, Version>& p) { return p.first == backupUid; });
if (it != ids.end()) {
ids.erase(it);
}

if (ids.empty()) {
TraceEvent("ClearBackup").detail("BackupID", backupUid);
tr->clear(backupStartedKey);
} else {
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
}
return Void();
}

// Backup and Restore taskFunc definitions will inherit from one of the following classes which
// servers to catch and log to the appropriate config any error that execute/finish didn't catch and log.
struct RestoreTaskFuncBase : TaskFuncBase {
Expand Down Expand Up @@ -2148,8 +2172,8 @@ namespace fileBackup {

tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
state Key destUidValue = wait(backup.destUidValue().getOrThrow(tr));
wait( eraseLogData(tr, backup.getUidAsKey(), destUidValue) );
wait(eraseLogData(tr, backup.getUidAsKey(), destUidValue) && clearBackupStartID(tr, uid));

backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);

wait(taskBucket->finish(tr, task));
Expand Down Expand Up @@ -2346,8 +2370,8 @@ namespace fileBackup {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
wait(checkTaskVersion(cx, task, StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version));

state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop{
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Expand All @@ -2361,7 +2385,56 @@ namespace fileBackup {
}
}

return Void();
// Check if backup worker is enabled
DatabaseConfiguration dbConfig = wait(getDatabaseConfiguration(cx));
if (!dbConfig.backupWorkerEnabled) {
wait(success(changeConfig(cx, "backup_worker_enabled:=1", true)));
}

// Set the "backupStartedKey" and wait for all backup worker started
tr->reset();
state BackupConfig config(task);
loop {
state Future<Void> watchFuture;
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Void> keepRunning = taskBucket->keepRunning(tr, task);

state Future<Optional<Value>> started = tr->get(backupStartedKey);
state Future<Optional<Value>> taskStarted = tr->get(config.allWorkerStarted().key);
wait(success(started) && success(taskStarted));

std::vector<std::pair<UID, Version>> ids;
if (started.get().present()) {
ids = decodeBackupStartedValue(started.get().get());
}
const UID uid = config.getUid();
auto it = std::find_if(ids.begin(), ids.end(),
[uid](const std::pair<UID, Version>& p) { return p.first == uid; });
if (it == ids.end()) {
ids.emplace_back(uid, Params.beginVersion().get(task));
} else {
Params.beginVersion().set(task, it->second);
}

tr->set(backupStartedKey, encodeBackupStartedValue(ids));

// The task may be restarted. Set the watch if started key has NOT been set.
if (!taskStarted.get().present()) {
watchFuture = tr->watch(config.allWorkerStarted().key);
}

wait(keepRunning);
wait(tr->commit());
if (!taskStarted.get().present()) {
wait(watchFuture);
}
return Void();
} catch (Error &e) {
wait(tr->onError(e));
}
}
}

ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
Expand Down Expand Up @@ -2396,6 +2469,7 @@ namespace fileBackup {
wait(success(FileBackupFinishedTask::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), backupFinished)));

wait(taskBucket->finish(tr, task));

return Void();
}

Expand Down Expand Up @@ -3790,7 +3864,8 @@ class FileBackupAgentImpl {

state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
wait(success(tr->getReadVersion()));
wait( eraseLogData(tr, config.getUidAsKey(), destUidValue) );
wait(eraseLogData(tr, config.getUidAsKey(), destUidValue) &&
fileBackup::clearBackupStartID(tr, config.getUid()));

config.stateEnum().set(tr, EBackupState::STATE_COMPLETED);

Expand Down Expand Up @@ -3829,8 +3904,9 @@ class FileBackupAgentImpl {

// Cancel backup task through tag
wait(tag.cancel(tr));

wait(eraseLogData(tr, config.getUidAsKey(), destUidValue));

wait(eraseLogData(tr, config.getUidAsKey(), destUidValue) &&
fileBackup::clearBackupStartID(tr, config.getUid()));

config.stateEnum().set(tr, EBackupState::STATE_ABORTED);

Expand Down
Loading

0 comments on commit 7aebe05

Please sign in to comment.