diff --git a/bindings/go/src/fdb/doc.go b/bindings/go/src/fdb/doc.go index 975970bc338..741be4e6394 100644 --- a/bindings/go/src/fdb/doc.go +++ b/bindings/go/src/fdb/doc.go @@ -25,7 +25,7 @@ Package fdb provides an interface to FoundationDB databases (version 2.0 or high To build and run programs using this package, you must have an installed copy of the FoundationDB client libraries (version 2.0.0 or later), available for Linux, -Windows and OS X at https://www.foundationdb.org/download/. +Windows and OS X at https://github.com/apple/foundationdb/releases This documentation specifically applies to the FoundationDB Go binding. For more extensive guidance to programming with FoundationDB, as well as API diff --git a/bindings/java/pom.xml.in b/bindings/java/pom.xml.in index bde89709842..7ff3c896562 100644 --- a/bindings/java/pom.xml.in +++ b/bindings/java/pom.xml.in @@ -10,7 +10,7 @@ jar foundationdb-java - Java bindings for the FoundationDB database. These bindings require the FoundationDB client, which is under a different license. The client can be obtained from https://www.foundationdb.org/download/. + Java bindings for the FoundationDB database. These bindings require the FoundationDB client, which is under a different license. The client can be obtained from https://github.com/apple/foundationdb/releases 2010 https://www.foundationdb.org @@ -26,7 +26,7 @@ - http://0.0.0.0 + https://github.com/apple/foundationdb diff --git a/bindings/java/src/main/overview.html.in b/bindings/java/src/main/overview.html.in index ad9d8aa8b39..a0c5572f681 100644 --- a/bindings/java/src/main/overview.html.in +++ b/bindings/java/src/main/overview.html.in @@ -7,7 +7,7 @@ FoundationDB's Java bindings rely on native libraries that are installed as part FoundationDB client binaries installation (see Installing FoundationDB client binaries). The JAR can be downloaded from -our website +maven central and then added to your classpath.

Getting started

diff --git a/bindings/python/README.rst b/bindings/python/README.rst index d3e2d696048..782e03896f3 100644 --- a/bindings/python/README.rst +++ b/bindings/python/README.rst @@ -1,3 +1,3 @@ Complete documentation of the FoundationDB Python API can be found at https://apple.github.io/foundationdb/api-python.html. -These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/download/. +These bindings require the FoundationDB client. The client can be obtained from https://github.com/apple/foundationdb/releases diff --git a/bindings/ruby/fdb.gemspec.cmake b/bindings/ruby/fdb.gemspec.cmake index 0049a973303..dbbdafd6701 100644 --- a/bindings/ruby/fdb.gemspec.cmake +++ b/bindings/ruby/fdb.gemspec.cmake @@ -18,5 +18,5 @@ EOF s.license = 'Apache-2.0' s.add_dependency('ffi', '~> 1.1', '>= 1.1.5') s.required_ruby_version = '>= 1.9.3' - s.requirements << 'These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/download/.' + s.requirements << 'These bindings require the FoundationDB client. The client can be obtained from https://github.com/apple/foundationdb/releases' end diff --git a/bindings/ruby/fdb.gemspec.in b/bindings/ruby/fdb.gemspec.in index 092bc2eacda..fa61f8aba4b 100644 --- a/bindings/ruby/fdb.gemspec.in +++ b/bindings/ruby/fdb.gemspec.in @@ -18,5 +18,5 @@ EOF s.license = 'Apache-2.0' s.add_dependency('ffi', '~> 1.1', '>= 1.1.5') s.required_ruby_version = '>= 1.9.3' - s.requirements << 'These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/download/.' + s.requirements << 'These bindings require the FoundationDB client. The client can be obtained from https://github.com/apple/foundationdb/releases' end diff --git a/cmake/CompileRocksDB.cmake b/cmake/CompileRocksDB.cmake index 2a4413dc835..43ca7298231 100644 --- a/cmake/CompileRocksDB.cmake +++ b/cmake/CompileRocksDB.cmake @@ -22,8 +22,8 @@ set(RocksDB_CMAKE_ARGS -DFAIL_ON_WARNINGS=OFF -DWITH_GFLAGS=OFF -DWITH_TESTS=OFF - -DWITH_TOOLS=OFF - -DWITH_CORE_TOOLS=OFF + -DWITH_TOOLS=${ROCKSDB_TOOLS} + -DWITH_CORE_TOOLS=${ROCKSDB_TOOLS} -DWITH_BENCHMARK_TOOLS=OFF -DWITH_BZ2=OFF -DWITH_LZ4=ON diff --git a/cmake/ConfigureCompiler.cmake b/cmake/ConfigureCompiler.cmake index c3384ef37c5..6e1bdb52b2d 100644 --- a/cmake/ConfigureCompiler.cmake +++ b/cmake/ConfigureCompiler.cmake @@ -164,6 +164,13 @@ else() add_compile_options(-fno-omit-frame-pointer) + if(CLANG) + # The default DWARF 5 format does not play nicely with GNU Binutils 2.39 and earlier, resulting + # in tools like addr2line omitting line numbers. We can consider removing this once we are able + # to use a version that has a fix. + add_compile_options(-gdwarf-4) + endif() + if(FDB_RELEASE OR FULL_DEBUG_SYMBOLS OR CMAKE_BUILD_TYPE STREQUAL "Debug") # Configure with FULL_DEBUG_SYMBOLS=ON to generate all symbols for debugging with gdb # Also generating full debug symbols in release builds. CPack will strip them out @@ -174,13 +181,6 @@ else() add_compile_options(-ggdb1) endif() - if(CLANG) - # The default DWARF 5 format does not play nicely with GNU Binutils 2.39 and earlier, resulting - # in tools like addr2line omitting line numbers. We can consider removing this once we are able - # to use a version that has a fix. - add_compile_options(-gdwarf-4) - endif() - if(NOT FDB_RELEASE) # Enable compression of the debug sections. This reduces the size of the binaries several times. # We do not enable it release builds, because CPack fails to generate debuginfo packages when diff --git a/cmake/FDBComponents.cmake b/cmake/FDBComponents.cmake index 9a55045f0c8..e9594646af8 100644 --- a/cmake/FDBComponents.cmake +++ b/cmake/FDBComponents.cmake @@ -157,6 +157,7 @@ set(PORTABLE_ROCKSDB ON CACHE BOOL "Compile RocksDB in portable mode") # Set thi set(ROCKSDB_SSE42 OFF CACHE BOOL "Compile RocksDB with SSE42 enabled") set(ROCKSDB_AVX ${USE_AVX} CACHE BOOL "Compile RocksDB with AVX enabled") set(ROCKSDB_AVX2 OFF CACHE BOOL "Compile RocksDB with AVX2 enabled") +set(ROCKSDB_TOOLS OFF CACHE BOOL "Compile RocksDB tools") set(WITH_LIBURING OFF CACHE BOOL "Build with liburing enabled") # Set this to ON to include liburing # RocksDB is currently enabled by default for GCC but does not build with the latest # Clang. diff --git a/contrib/TestHarness2/test_harness/run.py b/contrib/TestHarness2/test_harness/run.py index 4d391a6cacd..d9b6f2a3626 100644 --- a/contrib/TestHarness2/test_harness/run.py +++ b/contrib/TestHarness2/test_harness/run.py @@ -393,7 +393,7 @@ def log_test_plan(self, out: SummaryTree): def delete_simdir(self): shutil.rmtree(self.temp_path / Path("simfdb")) - def _run_rocksdb_logtool(self): + def _run_joshua_logtool(self): """Calls Joshua LogTool to upload the test logs if 1) test failed 2) test is RocksDB related""" if not os.path.exists("joshua_logtool.py"): raise RuntimeError("joshua_logtool.py missing") @@ -407,7 +407,12 @@ def _run_rocksdb_logtool(self): str(self.temp_path), "--check-rocksdb", ] - subprocess.run(command, check=True) + result = subprocess.run(command, capture_output=True, text=True) + return { + "stdout": result.stdout, + "stderr": result.stderr, + "exit_code": result.returncode, + } def run(self): command: List[str] = [] @@ -498,10 +503,16 @@ def run(self): self.summary.was_killed = did_kill self.summary.valgrind_out_file = valgrind_file self.summary.error_out = err_out + if not self.summary.is_negative_test and not self.summary.ok(): + logtool_result = self._run_joshua_logtool() + if logtool_result["exit_code"] != 0: + child = SummaryTree("JoshuaLogTool") + child.attributes["ExitCode"] = str(logtool_result["exit_code"]) + child.attributes["StdOut"] = logtool_result["stdout"] + child.attributes["StdErr"] = logtool_result["stderr"] + self.summary.out.append(child) self.summary.summarize(self.temp_path, " ".join(command)) - if not self.summary.is_negative_test and not self.summary.ok(): - self._run_rocksdb_logtool() return self.summary.ok() diff --git a/contrib/joshua_logtool.py b/contrib/joshua_logtool.py index 479542a22a9..848ddcf5d97 100755 --- a/contrib/joshua_logtool.py +++ b/contrib/joshua_logtool.py @@ -1,6 +1,6 @@ #! /usr/bin/env python3 -"""rocksdb_logtool.py +"""joshua_logtool.py Provides uploading/downloading FoundationDB log files to Joshua cluster. """ @@ -129,7 +129,7 @@ def list_commands(ensemble_id: str): def _setup_args(): - parser = argparse.ArgumentParser(prog="rocksdb_logtool.py") + parser = argparse.ArgumentParser(prog="joshua_logtool.py") parser.add_argument( "--cluster-file", type=str, default=None, help="Joshua FDB cluster file" diff --git a/design/global-tag-throttling.md b/design/global-tag-throttling.md index 91662462120..4fc90d9cec6 100644 --- a/design/global-tag-throttling.md +++ b/design/global-tag-throttling.md @@ -80,7 +80,7 @@ The ratekeeper must also track the rate of transactions performed with each tag. ### Average Cost Calculation Quotas are expressed in terms of cost, but because throttling is enforced at the beginning of transactions, budgets need to be calculated in terms of transactions per second. To make this conversion, it is necessary to track the average cost of transactions (per-tag, and per-tag on a particular storage server). -Both cost and transaction counters are smoothed using the `Smoother` class to provide stability over time. The "smoothing interval" can be modified through `SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME`. +Both cost and transaction counters are exponentially smoothed over time, with knob-configurable smoothing intervals. ### Reserved Rate Calculation The global tag throttler periodically reads reserved quotas from the system keyspace. Using these reserved quotas and the average cost of transactions with the given tag, a reserved TPS rate is computed. Read and write rates are aggregated as follows: diff --git a/fdbcli/GetAuditStatusCommand.actor.cpp b/fdbcli/GetAuditStatusCommand.actor.cpp index 5256f5f64b7..30962fe484c 100644 --- a/fdbcli/GetAuditStatusCommand.actor.cpp +++ b/fdbcli/GetAuditStatusCommand.actor.cpp @@ -63,7 +63,7 @@ ACTOR Future getAuditStatusCommandActor(Database cx, std::vectorTOO_MANY; if (tokens.size() == 4) { diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp index ba43a34d5fa..7a96f2d8555 100644 --- a/fdbclient/AuditUtils.actor.cpp +++ b/fdbclient/AuditUtils.actor.cpp @@ -119,26 +119,28 @@ ACTOR Future> getAuditStates(Database cx, state std::vector auditStates; state Key readBegin; state Key readEnd; - state RangeResult res; state Reverse reverse = newFirst ? Reverse::True : Reverse::False; - + if (num.present() && num.get() == 0) { + return auditStates; + } loop { try { readBegin = auditKeyRange(auditType).begin; readEnd = auditKeyRange(auditType).end; auditStates.clear(); while (true) { - res.clear(); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); KeyRangeRef rangeToRead(readBegin, readEnd); - if (num.present()) { - wait(store(res, tr.getRange(rangeToRead, num.get(), Snapshot::False, reverse))); - } else { - wait(store(res, tr.getRange(rangeToRead, GetRangeLimits(), Snapshot::False, reverse))); - } + state RangeResult res = wait(tr.getRange(rangeToRead, + num.present() ? GetRangeLimits(num.get()) : GetRangeLimits(), + Snapshot::False, + reverse)); for (int i = 0; i < res.size(); ++i) { auditStates.push_back(decodeAuditStorageState(res[i].value)); + if (num.present() && auditStates.size() == num.get()) { + return auditStates; // since res.more is not reliable when GetRangeLimits is set to 1 + } } if (!res.more) { break; diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 3af962ca09a..4cc18eef2a5 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -26,7 +26,9 @@ #include "fdbclient/NativeAPI.actor.h" #include #include +#include "fdbrpc/simulator.h" #include "flow/IAsyncFile.h" +#include "flow/flow.h" #include "flow/genericactors.actor.h" #include "flow/Hash3.h" #include @@ -361,8 +363,10 @@ struct BackupRangeTaskFunc : TaskFuncBase { if ((!prevAdjacent || !nextAdjacent) && rangeCount > ((prevAdjacent || nextAdjacent) ? CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT - : CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT)) { - CODE_PROBE(true, "range insert delayed because too versionMap is too large"); + : CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT) && + (!g_network->isSimulated() || + (isBuggifyEnabled(BuggifyType::General) && !g_simulator->speedUpSimulation))) { + CODE_PROBE(true, "range insert delayed because versionMap is too large"); if (rangeCount > CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT) TraceEvent(SevWarnAlways, "DBA_KeyRangeMapTooLarge").log(); diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index 1e1fa027761..b89a5229c82 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -3800,12 +3800,13 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { } // First and last key are the range for this file - state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key); + state KeyRange fileRange; state std::vector originalFileRanges; // If fileRange doesn't intersect restore range then we're done. state int index; for (index = 0; index < restoreRanges.get().size(); index++) { auto& restoreRange = restoreRanges.get()[index]; + fileRange = KeyRangeRef(blockData.front().key, blockData.back().key); if (!fileRange.intersects(restoreRange)) continue; diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 91583e11cfd..fcaac756f8d 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -180,8 +180,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true; init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled - init( ENABLE_DD_PHYSICAL_SHARD_MOVE, false ); if( isSimulated ) ENABLE_DD_PHYSICAL_SHARD_MOVE = deterministicRandom()->coinflip(); - init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.5 ); + init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.0 ); if( isSimulated ) DD_PHYSICAL_SHARD_MOVE_PROBABILITY = 0.5; init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD init( ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME, 600.0 ); if( randomize && BUGGIFY ) ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME = 0.0; // 600 seconds; for ENABLE_DD_PHYSICAL_SHARD @@ -830,23 +829,21 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5; init( TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL, 30.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL = 1.0; init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false; - init( SS_THROTTLE_TAGS_TRACKED, 1 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10); + init( SS_THROTTLE_TAGS_TRACKED, 5 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10); init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip(); init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING ); init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 ); - // 10 seconds was chosen as a default value to ensure that - // the global tag throttler does not react too drastically to - // changes in workload. To make the global tag throttler more reactive, - // lower this knob. To make global tag throttler more smooth, raise this knob. - // Setting this knob lower than TAG_MEASUREMENT_INTERVAL can cause erratic - // behaviour and is not recommended. - init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 ); init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 ); init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 ); init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 ); init( GLOBAL_TAG_THROTTLING_TRACE_INTERVAL, 5.0 ); init( GLOBAL_TAG_THROTTLING_REPORT_ONLY, false ); + init( GLOBAL_TAG_THROTTLING_TARGET_RATE_FOLDING_TIME, 10.0 ); + init( GLOBAL_TAG_THROTTLING_TRANSACTION_COUNT_FOLDING_TIME, 2.0 ); + init( GLOBAL_TAG_THROTTLING_TRANSACTION_RATE_FOLDING_TIME, 10.0 ); + init( GLOBAL_TAG_THROTTLING_COST_FOLDING_TIME, 10.0 ); + //Storage Metrics init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 ); init( STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, 1000.0 / STORAGE_METRICS_AVERAGE_INTERVAL ); // milliHz! @@ -879,7 +876,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 ); init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1; init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10; - init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1); + init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 10 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1); init( BUGGIFY_BLOCK_BYTES, 10000 ); init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS ); init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000; diff --git a/fdbclient/include/fdbclient/Audit.h b/fdbclient/include/fdbclient/Audit.h index ce9ae90a6b5..fc1e591b195 100644 --- a/fdbclient/include/fdbclient/Audit.h +++ b/fdbclient/include/fdbclient/Audit.h @@ -64,24 +64,10 @@ struct AuditStorageState { inline void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } inline AuditPhase getPhase() const { return static_cast(this->phase); } - // for fdbcli get_audit_status - std::string toStringForCLI() const { - std::string res = "AuditStorageState: [ID]: " + id.toString() + - ", [Range]: " + Traceable::toString(range) + - ", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase); - if (!error.empty()) { - res += "[Error]: " + error; - } - - return res; - } - - // for traceevent std::string toString() const { std::string res = "AuditStorageState: [ID]: " + id.toString() + ", [Range]: " + Traceable::toString(range) + - ", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase) + - ", [AuditServerID]: " + auditServerId.toString(); + ", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase); if (!error.empty()) { res += "[Error]: " + error; } diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index e411844c9d3..f71f2b833f2 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -829,7 +829,7 @@ struct RangeResultRef : VectorRef { serializer(ar, ((VectorRef&)*this), more, readThrough, readToBegin, readThroughEnd); } - int logicalSize() const { + int64_t logicalSize() const { return VectorRef::expectedSize() - VectorRef::size() * sizeof(KeyValueRef); } diff --git a/fdbclient/include/fdbclient/GetEncryptCipherKeys_impl.actor.h b/fdbclient/include/fdbclient/GetEncryptCipherKeys_impl.actor.h index acefa39f350..b40211cf099 100644 --- a/fdbclient/include/fdbclient/GetEncryptCipherKeys_impl.actor.h +++ b/fdbclient/include/fdbclient/GetEncryptCipherKeys_impl.actor.h @@ -37,6 +37,8 @@ #include "flow/actorcompiler.h" // This must be the last #include. +#define DEBUG_GET_CIPHER false + template Optional _getEncryptKeyProxyInterface(const Reference const>& db) { if constexpr (std::is_same_v) { @@ -62,9 +64,13 @@ Future _onEncryptKeyProxyChange(Reference const> db) { break; } } - TraceEvent("GetEncryptCipherKeysEncryptKeyProxyChanged") - .detail("PreviousProxyId", previousProxyId.orDefault(UID())) - .detail("CurrentProxyId", currentProxyId.orDefault(UID())); + + if (DEBUG_GET_CIPHER) { + TraceEvent(SevDebug, "GetEncryptCipherKeysEncryptKeyProxyChanged") + .detail("PreviousProxyId", previousProxyId.orDefault(UID())) + .detail("CurrentProxyId", currentProxyId.orDefault(UID())); + } + return Void(); } @@ -75,7 +81,10 @@ Future _getUncachedLatestEncryptCipherKeys(Refe Optional proxy = _getEncryptKeyProxyInterface(db); if (!proxy.present()) { // Wait for onEncryptKeyProxyChange. - TraceEvent("GetLatestEncryptCipherKeysEncryptKeyProxyNotPresent").detail("UsageType", toString(usageType)); + if (DEBUG_GET_CIPHER) { + TraceEvent(SevDebug, "GetLatestEncryptCipherKeysEncryptKeyProxyNotPresent") + .detail("UsageType", toString(usageType)); + } return Never(); } request.reply.reset(); @@ -178,7 +187,10 @@ Future _getUncachedEncryptCipherKeys(Reference proxy = _getEncryptKeyProxyInterface(db); if (!proxy.present()) { // Wait for onEncryptKeyProxyChange. - TraceEvent("GetEncryptCipherKeysEncryptKeyProxyNotPresent").detail("UsageType", toString(usageType)); + if (DEBUG_GET_CIPHER) { + TraceEvent(SevDebug, "GetEncryptCipherKeysEncryptKeyProxyNotPresent") + .detail("UsageType", toString(usageType)); + } return Never(); } request.reply.reset(); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index fa6ca86d41c..f02abfa5053 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -204,7 +204,6 @@ class ServerKnobs : public KnobsImpl { bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID. bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true. - bool ENABLE_DD_PHYSICAL_SHARD_MOVE; // Enable physical shard move. double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1]. int64_t MAX_PHYSICAL_SHARD_BYTES; double PHYSICAL_SHARD_METRICS_DELAY; @@ -763,8 +762,6 @@ class ServerKnobs : public KnobsImpl { // To protect against this, we do not compute the average cost when the // measured tps drops below this threshold double GLOBAL_TAG_THROTTLING_MIN_RATE; - // Used by global tag throttling counters - double GLOBAL_TAG_THROTTLING_FOLDING_TIME; // Maximum number of tags tracked by global tag throttler. Additional tags will be ignored // until some existing tags expire int64_t GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED; @@ -780,6 +777,11 @@ class ServerKnobs : public KnobsImpl { // enforcement. bool GLOBAL_TAG_THROTTLING_REPORT_ONLY; + double GLOBAL_TAG_THROTTLING_TARGET_RATE_FOLDING_TIME; + double GLOBAL_TAG_THROTTLING_TRANSACTION_COUNT_FOLDING_TIME; + double GLOBAL_TAG_THROTTLING_TRANSACTION_RATE_FOLDING_TIME; + double GLOBAL_TAG_THROTTLING_COST_FOLDING_TIME; + double MAX_TRANSACTIONS_PER_BYTE; int64_t MIN_AVAILABLE_SPACE; diff --git a/fdbclient/include/fdbclient/StorageServerInterface.h b/fdbclient/include/fdbclient/StorageServerInterface.h index 8f6da386fe7..4057112d17a 100644 --- a/fdbclient/include/fdbclient/StorageServerInterface.h +++ b/fdbclient/include/fdbclient/StorageServerInterface.h @@ -1168,23 +1168,27 @@ struct GetStorageMetricsRequest { } }; -struct StorageQueuingMetricsReply { - struct TagInfo { - constexpr static FileIdentifier file_identifier = 4528694; - TransactionTag tag; - double rate{ 0.0 }; - double fractionalBusyness{ 0.0 }; - - TagInfo() = default; - TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness) - : tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {} - - template - void serialize(Ar& ar) { - serializer(ar, tag, rate, fractionalBusyness); - } - }; +// Tracks the busyness of tags on individual storage servers. +struct BusyTagInfo { + constexpr static FileIdentifier file_identifier = 4528694; + TransactionTag tag; + double rate{ 0.0 }; + double fractionalBusyness{ 0.0 }; + + BusyTagInfo() = default; + BusyTagInfo(TransactionTag const& tag, double rate, double fractionalBusyness) + : tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {} + + bool operator<(BusyTagInfo const& rhs) const { return rate < rhs.rate; } + bool operator>(BusyTagInfo const& rhs) const { return rate > rhs.rate; } + + template + void serialize(Ar& ar) { + serializer(ar, tag, rate, fractionalBusyness); + } +}; +struct StorageQueuingMetricsReply { constexpr static FileIdentifier file_identifier = 7633366; double localTime; int64_t instanceID; // changes if bytesDurable and bytesInput reset @@ -1195,7 +1199,7 @@ struct StorageQueuingMetricsReply { double cpuUsage{ 0.0 }; double diskUsage{ 0.0 }; double localRateLimit; - std::vector busiestTags; + std::vector busiestTags; template void serialize(Ar& ar) { diff --git a/fdbrpc/include/fdbrpc/Smoother.h b/fdbrpc/include/fdbrpc/Smoother.h index d806a3d2aa0..e79ecbdd048 100644 --- a/fdbrpc/include/fdbrpc/Smoother.h +++ b/fdbrpc/include/fdbrpc/Smoother.h @@ -23,6 +23,8 @@ #include "flow/flow.h" #include +// Implements a basic exponential smoothing algorithm +// (see https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_(simple)_exponential_smoothing) template class SmootherImpl { // Times (t) are expected to be nondecreasing @@ -77,3 +79,71 @@ class TimerSmoother : public SmootherImpl { static double now() { return timer(); } explicit TimerSmoother(double eFoldingTime) : SmootherImpl(eFoldingTime) {} }; + +// Implements a Holt linear smoothing algorithm +// (see https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing_(Holt_linear)) +// This is more accurate than Smoother for metrics that have a trend. +template +class HoltLinearSmootherImpl { + // Times (t) are expected to be nondecreasing + double eDataFoldingTime, eTrendFoldingTime; + double total, lastEstimate, lastRateEstimate, lastTime; + +protected: + explicit HoltLinearSmootherImpl(double eDataFoldingTime, double eTrendFoldingTime) + : eDataFoldingTime(eDataFoldingTime), eTrendFoldingTime(eTrendFoldingTime) { + reset(0); + } + +public: + void reset(double value) { + total = value; + lastEstimate = value; + lastRateEstimate = 0; + lastTime = 0; + } + + void setTotal(double total, double t = T::now()) { addDelta(total - this->total, t); } + + void addDelta(double delta, double t = T::now()) { + double const elapsed = t - lastTime; + if (elapsed) { + double const rateEstimate = smoothRate(); + lastEstimate = smoothTotal(); + lastRateEstimate = rateEstimate; + lastTime = t; + } + this->total += delta; + } + + double smoothTotal(double t = T::now()) const { + double const elapsed = t - lastTime; + double const alpha = 1 - exp(-elapsed / eDataFoldingTime); + return alpha * total + (1 - alpha) * (lastEstimate + elapsed * lastRateEstimate); + } + + double smoothRate(double t = T::now()) const { + double const elapsed = t - lastTime; + if (elapsed) { + double const recentRate = (smoothTotal() - lastEstimate) / elapsed; + double const beta = 1 - exp(-elapsed / eTrendFoldingTime); + return beta * recentRate + (1 - beta) * lastRateEstimate; + } else { + return lastRateEstimate; + } + } + + double getTotal() const { return total; } +}; + +class HoltLinearSmoother : public HoltLinearSmootherImpl { +public: + static double now() { return ::now(); } + explicit HoltLinearSmoother(double eDataFoldingTime, double eTrendFoldingTime) + : HoltLinearSmootherImpl(eDataFoldingTime, eTrendFoldingTime) {} +}; +class HoltLinearTimerSmoother : public HoltLinearSmootherImpl { + static double now() { return timer(); } + explicit HoltLinearTimerSmoother(double eDataFoldingTime, double eTrendFoldingTime) + : HoltLinearSmootherImpl(eDataFoldingTime, eTrendFoldingTime) {} +}; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index f0c23e6f68e..cdb6b473555 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2368,6 +2368,23 @@ ACTOR Future getNextBMEpoch(ClusterControllerData* self) { } } +ACTOR Future stopConsistencyScan(Database db) { + state ConsistencyScanState cs = ConsistencyScanState(); + state Reference tr = makeReference(db); + loop { + try { + SystemDBWriteLockedNow(db.getReference())->setOptions(tr); + state ConsistencyScanState::Config config = wait(ConsistencyScanState().config().getD(tr)); + config.enabled = false; + cs.config().set(tr, config); + wait(tr->commit()); + return Void(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + ACTOR Future watchBlobRestoreCommand(ClusterControllerData* self) { state Reference restoreController = makeReference(self->cx, normalKeys); @@ -2393,7 +2410,9 @@ ACTOR Future watchBlobRestoreCommand(ClusterControllerData* self) { } } self->db.blobRestoreEnabled.set(phase > BlobRestorePhase::UNINIT && phase < BlobRestorePhase::DONE); - + if (self->db.blobRestoreEnabled.get()) { + wait(stopConsistencyScan(self->cx)); + } wait(BlobRestoreController::onPhaseChange(restoreController, BlobRestorePhase::INIT)); } catch (Error& e) { TraceEvent("WatchBlobRestoreCommand", self->id).error(e); diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index 9c0f957742f..819a67c4089 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -1047,7 +1047,7 @@ void DDQueue::launchQueuedWork(std::set rrs.dataMoveId = UID(); } else { const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; + deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); @@ -1635,7 +1635,7 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, self->moveCreateNewPhysicalShard++; } const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; + deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; rd.dataMoveId = newDataMoveId( physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard") diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 9c701816206..84af2ba1e4c 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2571,7 +2571,8 @@ ACTOR Future doAuditOnStorageServer(Reference self, self->remainingBudgetForAuditTasks[auditType].set(self->remainingBudgetForAuditTasks[auditType].get() + 1); ASSERT(self->remainingBudgetForAuditTasks[auditType].get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX); - if (e.code() == error_code_actor_cancelled) { + if (e.code() == error_code_actor_cancelled || e.code() == error_code_not_implemented || + e.code() == error_code_audit_storage_exceeded_request_limit) { throw e; } else if (e.code() == error_code_audit_storage_error) { audit->foundError = true; diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp index 6e23cf6c0e5..cf6a8a2ed58 100644 --- a/fdbserver/GlobalTagThrottler.actor.cpp +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -100,8 +100,8 @@ class GlobalTagThrottlerImpl { public: ThroughputCounters() - : readCost(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), - writeCost(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {} + : readCost(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_COST_FOLDING_TIME), + writeCost(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_COST_FOLDING_TIME) {} void updateCost(double newCost, OpType opType) { if (opType == OpType::READ) { @@ -117,7 +117,7 @@ class GlobalTagThrottlerImpl { // Track various statistics per tag, aggregated across all storage servers class PerTagStatistics { Optional quota; - Smoother transactionCounter; + HoltLinearSmoother transactionCounter; Smoother perClientRate; Smoother targetRate; double transactionsLastAdded; @@ -125,9 +125,11 @@ class GlobalTagThrottlerImpl { public: explicit PerTagStatistics() - : transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), - perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), - targetRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), transactionsLastAdded(now()), lastLogged(0) {} + : transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TRANSACTION_COUNT_FOLDING_TIME, + SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TRANSACTION_RATE_FOLDING_TIME), + perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TARGET_RATE_FOLDING_TIME), + targetRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TARGET_RATE_FOLDING_TIME), transactionsLastAdded(now()), + lastLogged(0) {} Optional getQuota() const { return quota; } @@ -466,7 +468,7 @@ class GlobalTagThrottlerImpl { // Currently there is no differentiation between batch priority and default priority transactions TraceEvent te("GlobalTagThrottler_GotRate", id); bool const traceEnabled = stats.canLog(); - if (traceEnabled) { + if (!traceEnabled) { te.disable(); } bool isBusy{ false }; @@ -497,7 +499,8 @@ class GlobalTagThrottlerImpl { // Currently there is no differentiation between batch priority and default priority transactions bool isBusy{ false }; TraceEvent te("GlobalTagThrottler_GotClientRate", id); - if (!stats.canLog()) { + bool const traceEnabled = stats.canLog(); + if (!traceEnabled) { te.disable(); } auto const targetTps = getTargetTps(tag, isBusy, te); @@ -510,7 +513,9 @@ class GlobalTagThrottlerImpl { auto const clientRate = stats.updateAndGetPerClientLimit(targetTps.get()); result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = clientRate; te.detail("ClientTps", clientRate.tpsRate); - stats.updateLastLogged(); + if (traceEnabled) { + stats.updateLastLogged(); + } } else { te.disable(); } @@ -663,7 +668,7 @@ class MockStorageServer { Smoother smoother; public: - Cost() : smoother(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {} + Cost() : smoother(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_COST_FOLDING_TIME) {} Cost& operator+=(double delta) { smoother.addDelta(delta); return *this; diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 272163c005b..47844f29c15 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -298,7 +298,7 @@ const char* ShardOpToString(ShardOp op) { } void logShardEvent(StringRef name, ShardOp op, Severity severity = SevInfo, const std::string& message = "") { TraceEvent e(severity, "ShardedRocksDBKVSShardEvent"); - e.detail("Name", name).detail("Action", ShardOpToString(op)); + e.detail("ShardId", name).detail("Action", ShardOpToString(op)); if (!message.empty()) { e.detail("Message", message); } @@ -309,7 +309,10 @@ void logShardEvent(StringRef name, Severity severity = SevInfo, const std::string& message = "") { TraceEvent e(severity, "ShardedRocksDBKVSShardEvent"); - e.detail("Name", name).detail("Action", ShardOpToString(op)).detail("Begin", range.begin).detail("End", range.end); + e.detail("ShardId", name) + .detail("Action", ShardOpToString(op)) + .detail("Begin", range.begin) + .detail("End", range.end); if (message != "") { e.detail("Message", message); } @@ -652,6 +655,7 @@ struct PhysicalShard { logRocksDBError(status, "AddCF"); return status; } + logShardEvent(id, ShardOp::OPEN); readIterPool = std::make_shared(db, cf, id); this->isInitialized.store(true); return status; @@ -665,7 +669,7 @@ struct PhysicalShard { rocksdb::ExportImportFilesMetaData metaData = getMetaData(checkpoint); if (metaData.files.empty()) { TraceEvent(SevInfo, "RocksDBRestoreEmptyShard") - .detail("Shard", id) + .detail("ShardId", id) .detail("CheckpointID", checkpoint.checkpointID); status = db->CreateColumnFamily(getCFOptions(), id, &cf); } else { @@ -697,12 +701,12 @@ struct PhysicalShard { status = db->IngestExternalFile(cf, sstFiles, ingestOptions); } else { TraceEvent(SevWarn, "RocksDBServeRestoreEmptyRange") - .detail("Shard", id) + .detail("ShardId", id) .detail("RocksKeyValuesCheckpoint", rcp.toString()) .detail("Checkpoint", checkpoint.toString()); } TraceEvent(SevInfo, "PhysicalShardRestoredFiles") - .detail("Shard", id) + .detail("ShardId", id) .detail("CFName", cf->GetName()) .detail("Checkpoint", checkpoint.toString()) .detail("RestoredFiles", describe(sstFiles)); @@ -765,11 +769,7 @@ struct PhysicalShard { readIterPool.reset(); // Deleting default column family is not allowed. - if (id == DEFAULT_CF_NAME) { - return; - } - - if (deletePending) { + if (deletePending && id != DEFAULT_CF_NAME) { auto s = db->DropColumnFamily(cf); if (!s.ok()) { logRocksDBError(s, "DestroyShard"); @@ -916,7 +916,7 @@ class ShardManager { rocksdb::ColumnFamilyMetaData cfMetadata; shard->db->GetColumnFamilyMetaData(shard->cf, &cfMetadata); TraceEvent e(SevInfo, "PhysicalShardLevelStats"); - e.detail("PhysicalShardID", id); + e.detail("ShardId", id); std::string levelProp; for (auto it = cfMetadata.levels.begin(); it != cfMetadata.levels.end(); ++it) { std::string propValue = ""; @@ -939,7 +939,7 @@ class ShardManager { rocksdb::Status init() { const double start = now(); // Open instance. - TraceEvent(SevInfo, "ShardedRocksShardManagerInitBegin", this->logId).detail("DataPath", path); + TraceEvent(SevInfo, "ShardedRocksDBInitBegin", this->logId).detail("DataPath", path); if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) { // Set rate limiter to a higher rate to avoid blocking storage engine initialization. auto rateLimiter = rocksdb::NewGenericRateLimiter((int64_t)5 << 30, // 5GB @@ -961,8 +961,6 @@ class ShardManager { descriptors.push_back(rocksdb::ColumnFamilyDescriptor(name, cfOptions)); } - ASSERT(foundMetadata || descriptors.size() == 0); - // Add default column family if it's a newly opened database. if (descriptors.size() == 0) { descriptors.push_back(rocksdb::ColumnFamilyDescriptor("default", cfOptions)); @@ -987,8 +985,7 @@ class ShardManager { } physicalShards[shard->id] = shard; columnFamilyMap[handle->GetID()] = handle; - TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId) - .detail("PhysicalShardID", shard->id); + TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId).detail("ShardId", shard->id); } std::set unusedShards(columnFamilies.begin(), columnFamilies.end()); @@ -1014,7 +1011,7 @@ class ShardManager { metadata[i + 1].key.removePrefix(shardMappingPrefix)); TraceEvent(SevVerbose, "DecodeShardMapping", this->logId) .detail("Range", range) - .detail("Name", name); + .detail("ShardId", name); // Empty name indicates the shard doesn't belong to the SS/KVS. if (name.empty()) { @@ -1050,13 +1047,13 @@ class ShardManager { } for (const auto& name : unusedShards) { - TraceEvent(SevDebug, "UnusedShardName", logId).detail("Name", name); auto it = physicalShards.find(name); ASSERT(it != physicalShards.end()); auto shard = it->second; if (shard->dataShards.size() == 0) { shard->deleteTimeSec = now(); pendingDeletionShards.push_back(name); + TraceEvent(SevInfo, "UnusedPhysicalShard", logId).detail("ShardId", name); } } if (unusedShards.size() > 0) { @@ -1101,7 +1098,7 @@ class ShardManager { if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) { dbOptions.rate_limiter->SetBytesPerSecond(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC); } - TraceEvent(SevInfo, "ShardedRocksShardManagerInitEnd", this->logId) + TraceEvent(SevInfo, "ShardedRocksDBInitEnd", this->logId) .detail("DataPath", path) .detail("Duration", now() - start); return status; @@ -1171,9 +1168,7 @@ class ShardManager { } PhysicalShard* addRange(KeyRange range, std::string id) { - TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId) - .detail("Range", range) - .detail("PhysicalShardID", id); + TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId).detail("Range", range).detail("ShardId", id); // Newly added range should not overlap with any existing range. auto ranges = dataShardMap.intersectingRanges(range); @@ -1210,15 +1205,13 @@ class ShardManager { validate(); - TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId) - .detail("Range", range) - .detail("PhysicalShardID", id); + TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId).detail("Range", range).detail("ShardId", id); return shard.get(); } std::vector removeRange(KeyRange range) { - TraceEvent(SevInfo, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range); + TraceEvent(SevVerbose, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range); std::vector shardIds; std::vector newShards; @@ -1241,7 +1234,7 @@ class ShardManager { auto bytesRead = readRangeInDb(existingShard, range, 1, UINT16_MAX, &rangeResult); if (bytesRead > 0) { TraceEvent(SevError, "ShardedRocksDBRangeNotEmpty") - .detail("PhysicalShard", existingShard->toString()) + .detail("ShardId", existingShard->toString()) .detail("Range", range) .detail("DataShardRange", shardRange); // Force clear range. @@ -1249,13 +1242,6 @@ class ShardManager { dirtyShards->insert(it.value()->physicalShard); } } - - TraceEvent(SevDebug, "ShardedRocksRemoveRange") - .detail("Range", range) - .detail("IntersectingRange", shardRange) - .detail("DataShardRange", it.value()->range) - .detail("PhysicalShard", existingShard->toString()); - ASSERT(it.value()->range == shardRange); // Ranges should be consistent. if (range.contains(shardRange)) { @@ -1263,9 +1249,9 @@ class ShardManager { TraceEvent(SevInfo, "ShardedRocksRemovedRange") .detail("Range", range) .detail("RemovedRange", shardRange) - .detail("PhysicalShard", existingShard->toString()); + .detail("ShardId", existingShard->toString()); if (existingShard->dataShards.size() == 0) { - TraceEvent(SevDebug, "ShardedRocksDB").detail("EmptyShardId", existingShard->id); + TraceEvent(SevInfo, "ShardedRocksDBEmptyShard").detail("ShardId", existingShard->id); shardIds.push_back(existingShard->id); existingShard->deleteTimeSec = now(); pendingDeletionShards.push_back(existingShard->id); @@ -1440,7 +1426,7 @@ class ShardManager { .detail("Action", "PersistRangeMapping") .detail("BeginKey", it.range().begin) .detail("EndKey", it.range().end) - .detail("PhysicalShardID", it.value()->physicalShard->id); + .detail("ShardId", it.value()->physicalShard->id); } else { // Empty range. @@ -1449,7 +1435,7 @@ class ShardManager { .detail("Action", "PersistRangeMapping") .detail("BeginKey", it.range().begin) .detail("EndKey", it.range().end) - .detail("PhysicalShardID", "None"); + .detail("ShardId", "None"); } lastKey = it.range().end; } @@ -1524,9 +1510,6 @@ class ShardManager { dbOptions.rate_limiter->SetBytesPerSecond((int64_t)5 << 30); } columnFamilyMap.clear(); - for (auto& [_, shard] : physicalShards) { - shard->deletePending = true; - } physicalShards.clear(); // Close DB. auto s = db->Close(); @@ -3240,6 +3223,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } catch (Error& e) { TraceEvent(SevError, "ShardedRocksCloseReadThreadError").errorUnsuppressed(e); } + + TraceEvent("CloseKeyValueStore").detail("DeleteKVS", deleteOnClose); auto a = new Writer::CloseAction(&self->shardManager, deleteOnClose); auto f = a->done.getFuture(); self->writeThread->post(a); @@ -3663,6 +3648,7 @@ TEST_CASE("noSim/ShardedRocksDB/Initialization") { Future closed = kvStore->onClosed(); kvStore->dispose(); wait(closed); + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } @@ -3691,6 +3677,7 @@ TEST_CASE("noSim/ShardedRocksDB/SingleShardRead") { Future closed = kvStore->onClosed(); kvStore->dispose(); wait(closed); + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } @@ -3849,7 +3836,7 @@ TEST_CASE("noSim/ShardedRocksDB/RangeOps") { kvStore->dispose(); wait(closed); } - + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } @@ -3956,6 +3943,7 @@ TEST_CASE("noSim/ShardedRocksDB/ShardOps") { kvStore->dispose(); wait(closed); } + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } @@ -4105,7 +4093,7 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") { kvStore->dispose(); wait(closed); } - + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } @@ -4413,6 +4401,7 @@ TEST_CASE("perf/ShardedRocksDB/RangeClearSysKey") { Future closed = kvStore->onClosed(); kvStore->dispose(); wait(closed); + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } @@ -4473,6 +4462,7 @@ TEST_CASE("perf/ShardedRocksDB/RangeClearUserKey") { Future closed = kvStore->onClosed(); kvStore->dispose(); wait(closed); + ASSERT(!directoryExists(rocksDBTestDir)); return Void(); } } // namespace diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 251a13e026a..ed8340f5d09 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1953,9 +1953,7 @@ ACTOR static Future finishMoveShards(Database occ, wait(waitForAll(actors)); if (range.end == dataMove.ranges.front().end) { - if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) { - wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - } + wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); tr.clear(dataMoveKeyFor(dataMoveId)); complete = true; TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", dataMoveId) @@ -2707,9 +2705,7 @@ ACTOR Future cleanUpDataMoveCore(Database occ, } if (range.end == dataMove.ranges.front().end) { - if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) { - wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - } + wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); tr.clear(dataMoveKeyFor(dataMoveId)); complete = true; TraceEvent(sevDm, "CleanUpDataMoveDeleteMetaData", dataMoveId) diff --git a/fdbserver/TransactionTagCounter.actor.cpp b/fdbserver/TransactionTagCounter.actor.cpp new file mode 100644 index 00000000000..c450dc82a01 --- /dev/null +++ b/fdbserver/TransactionTagCounter.actor.cpp @@ -0,0 +1,185 @@ +/* + * TransactionTagCounter.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/TransactionTagCounter.h" +#include "flow/Trace.h" +#include "flow/actorcompiler.h" + +class TransactionTagCounterImpl { + UID thisServerID; + TransactionTagMap intervalCosts; + double intervalTotalCost = 0; + double intervalStart = 0; + int maxTagsTracked; + double minRateTracked; + + std::vector previousBusiestTags; + Reference busiestReadTagEventHolder; + + std::vector getBusiestTagsFromLastInterval(double elapsed) const { + std::priority_queue, std::greater> topKTags; + for (auto const& [tag, cost] : intervalCosts) { + auto const rate = cost / elapsed; + auto const fractionalBusyness = std::min(1.0, cost / intervalTotalCost); + if (rate < minRateTracked) { + continue; + } else if (topKTags.size() < maxTagsTracked) { + topKTags.emplace(tag, rate, fractionalBusyness); + } else if (topKTags.top().rate < rate) { + topKTags.pop(); + topKTags.emplace(tag, rate, fractionalBusyness); + } + } + std::vector result; + while (!topKTags.empty()) { + result.push_back(std::move(topKTags.top())); + topKTags.pop(); + } + return result; + } + +public: + TransactionTagCounterImpl(UID thisServerID, int maxTagsTracked, double minRateTracked) + : thisServerID(thisServerID), maxTagsTracked(maxTagsTracked), minRateTracked(minRateTracked), + busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} + + void addRequest(Optional const& tags, int64_t bytes) { + auto const cost = getReadOperationCost(bytes); + intervalTotalCost += cost; + if (tags.present()) { + for (auto const& tag : tags.get()) { + CODE_PROBE(true, "Tracking transaction tag in TransactionTagCounter"); + intervalCosts[TransactionTag(tag, tags.get().getArena())] += cost / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE; + } + } + } + + void startNewInterval() { + double elapsed = now() - intervalStart; + previousBusiestTags.clear(); + if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { + previousBusiestTags = getBusiestTagsFromLastInterval(elapsed); + + // For status, report the busiest tag: + if (previousBusiestTags.empty()) { + TraceEvent("BusiestReadTag", thisServerID).detail("TagCost", 0.0); + } else { + auto busiestTagInfo = previousBusiestTags[0]; + for (int i = 1; i < previousBusiestTags.size(); ++i) { + auto const& tagInfo = previousBusiestTags[i]; + if (tagInfo.rate > busiestTagInfo.rate) { + busiestTagInfo = tagInfo; + } + } + TraceEvent("BusiestReadTag", thisServerID) + .detail("Tag", printable(busiestTagInfo.tag)) + .detail("TagCost", busiestTagInfo.rate) + .detail("FractionalBusyness", busiestTagInfo.fractionalBusyness); + } + + for (const auto& tagInfo : previousBusiestTags) { + TraceEvent("BusyReadTag", thisServerID) + .detail("Tag", printable(tagInfo.tag)) + .detail("TagCost", tagInfo.rate) + .detail("FractionalBusyness", tagInfo.fractionalBusyness); + } + } + + intervalCosts.clear(); + intervalTotalCost = 0; + intervalStart = now(); + } + + std::vector const& getBusiestTags() const { return previousBusiestTags; } +}; + +TransactionTagCounter::TransactionTagCounter(UID thisServerID, int maxTagsTracked, double minRateTracked) + : impl(PImpl::create(thisServerID, maxTagsTracked, minRateTracked)) {} + +TransactionTagCounter::~TransactionTagCounter() = default; + +void TransactionTagCounter::addRequest(Optional const& tags, int64_t bytes) { + return impl->addRequest(tags, bytes); +} + +void TransactionTagCounter::startNewInterval() { + return impl->startNewInterval(); +} + +std::vector const& TransactionTagCounter::getBusiestTags() const { + return impl->getBusiestTags(); +} + +namespace { + +bool containsTag(std::vector const& busyTags, TransactionTagRef tag) { + return std::count_if(busyTags.begin(), busyTags.end(), [tag](auto const& tagInfo) { return tagInfo.tag == tag; }) == + 1; +} + +TagSet getTagSet(TransactionTagRef tag) { + TagSet result; + result.addTag(tag); + return result; +} + +} // namespace + +TEST_CASE("/fdbserver/TransactionTagCounter/IgnoreBeyondMaxTags") { + state TransactionTagCounter counter(UID(), + /*maxTagsTracked=*/2, + /*minRateTracked=*/10.0 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE / + CLIENT_KNOBS->READ_TAG_SAMPLE_RATE); + counter.startNewInterval(); + ASSERT_EQ(counter.getBusiestTags().size(), 0); + { + wait(delay(1.0)); + counter.addRequest(getTagSet("tagA"_sr), 10 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE); + counter.addRequest(getTagSet("tagA"_sr), 10 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE); + counter.addRequest(getTagSet("tagB"_sr), 15 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE); + counter.addRequest(getTagSet("tagC"_sr), 20 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE); + counter.startNewInterval(); + auto const busiestTags = counter.getBusiestTags(); + ASSERT_EQ(busiestTags.size(), 2); + ASSERT(containsTag(busiestTags, "tagA"_sr)); + ASSERT(!containsTag(busiestTags, "tagB"_sr)); + ASSERT(containsTag(busiestTags, "tagC"_sr)); + } + return Void(); +} + +TEST_CASE("/fdbserver/TransactionTagCounter/IgnoreBelowMinRate") { + state TransactionTagCounter counter(UID(), + /*maxTagsTracked=*/2, + /*minRateTracked=*/10.0 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE / + CLIENT_KNOBS->READ_TAG_SAMPLE_RATE); + counter.startNewInterval(); + ASSERT_EQ(counter.getBusiestTags().size(), 0); + { + wait(delay(1.0)); + counter.addRequest(getTagSet("tagA"_sr), 5 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE); + counter.startNewInterval(); + auto const busiestTags = counter.getBusiestTags(); + ASSERT_EQ(busiestTags.size(), 0); + } + return Void(); +} diff --git a/fdbserver/TransactionTagCounter.cpp b/fdbserver/TransactionTagCounter.cpp deleted file mode 100644 index ff46d28f86a..00000000000 --- a/fdbserver/TransactionTagCounter.cpp +++ /dev/null @@ -1,227 +0,0 @@ -/* - * TransactionTagCounter.cpp - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fdbclient/NativeAPI.actor.h" -#include "fdbserver/Knobs.h" -#include "fdbserver/TransactionTagCounter.h" -#include "flow/Trace.h" - -namespace { - -class TopKTags { -public: - struct TagAndCount { - TransactionTag tag; - int64_t count; - bool operator<(TagAndCount const& other) const { return count < other.count; } - explicit TagAndCount(TransactionTag tag, int64_t count) : tag(tag), count(count) {} - }; - -private: - // Because the number of tracked is expected to be small, they can be tracked - // in a simple vector. If the number of tracked tags increases, a more sophisticated - // data structure will be required. - std::vector topTags; - int limit; - -public: - explicit TopKTags(int limit) : limit(limit) { - ASSERT_GT(limit, 0); - topTags.reserve(limit); - } - - void incrementCount(TransactionTag tag, int previousCount, int increase) { - auto iter = std::find_if(topTags.begin(), topTags.end(), [tag](const auto& tc) { return tc.tag == tag; }); - if (iter != topTags.end()) { - ASSERT_EQ(previousCount, iter->count); - iter->count += increase; - } else if (topTags.size() < limit) { - ASSERT_EQ(previousCount, 0); - topTags.emplace_back(tag, increase); - } else { - auto toReplace = std::min_element(topTags.begin(), topTags.end()); - ASSERT_GE(toReplace->count, previousCount); - if (toReplace->count < previousCount + increase) { - toReplace->tag = tag; - toReplace->count = previousCount + increase; - } - } - } - - std::vector getBusiestTags(double elapsed, double totalSampleCount) const { - std::vector result; - for (auto const& tagAndCounter : topTags) { - auto rate = (tagAndCounter.count / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE) / elapsed; - if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE) { - result.emplace_back(tagAndCounter.tag, rate, tagAndCounter.count / totalSampleCount); - } - } - return result; - } - - void clear() { topTags.clear(); } -}; - -} // namespace - -class TransactionTagCounterImpl { - UID thisServerID; - TransactionTagMap intervalCounts; - int64_t intervalTotalSampledCount = 0; - TopKTags topTags; - double intervalStart = 0; - - std::vector previousBusiestTags; - Reference busiestReadTagEventHolder; - -public: - TransactionTagCounterImpl(UID thisServerID) - : thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED), - busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} - - void addRequest(Optional const& tags, int64_t bytes) { - if (tags.present()) { - CODE_PROBE(true, "Tracking transaction tag in counter"); - auto const cost = getReadOperationCost(bytes); - for (auto& tag : tags.get()) { - int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())]; - topTags.incrementCount(tag, count, cost); - count += cost; - } - - intervalTotalSampledCount += cost; - } - } - - void startNewInterval() { - double elapsed = now() - intervalStart; - previousBusiestTags.clear(); - if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { - previousBusiestTags = topTags.getBusiestTags(elapsed, intervalTotalSampledCount); - - // For status, report the busiest tag: - if (previousBusiestTags.empty()) { - TraceEvent("BusiestReadTag", thisServerID).detail("TagCost", 0.0); - } else { - auto busiestTagInfo = previousBusiestTags[0]; - for (int i = 1; i < previousBusiestTags.size(); ++i) { - auto const& tagInfo = previousBusiestTags[i]; - if (tagInfo.rate > busiestTagInfo.rate) { - busiestTagInfo = tagInfo; - } - } - TraceEvent("BusiestReadTag", thisServerID) - .detail("Tag", busiestTagInfo.tag) - .detail("TagCost", busiestTagInfo.rate) - .detail("FractionalBusyness", busiestTagInfo.fractionalBusyness); - } - - for (const auto& tagInfo : previousBusiestTags) { - TraceEvent("BusyReadTag", thisServerID) - .detail("Tag", tagInfo.tag) - .detail("TagCost", tagInfo.rate) - .detail("FractionalBusyness", tagInfo.fractionalBusyness); - } - } - - intervalCounts.clear(); - intervalTotalSampledCount = 0; - topTags.clear(); - intervalStart = now(); - } - - std::vector const& getBusiestTags() const { return previousBusiestTags; } -}; - -TransactionTagCounter::TransactionTagCounter(UID thisServerID) - : impl(PImpl::create(thisServerID)) {} - -TransactionTagCounter::~TransactionTagCounter() = default; - -void TransactionTagCounter::addRequest(Optional const& tags, int64_t bytes) { - return impl->addRequest(tags, bytes); -} - -void TransactionTagCounter::startNewInterval() { - return impl->startNewInterval(); -} - -std::vector const& TransactionTagCounter::getBusiestTags() const { - return impl->getBusiestTags(); -} - -TEST_CASE("/TransactionTagCounter/TopKTags") { - TopKTags topTags(2); - - // Ensure that costs are larger enough to show up - auto const costMultiplier = - std::max(1.0, - 2 * SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE * - CLIENT_KNOBS->READ_TAG_SAMPLE_RATE); - - ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0); - topTags.incrementCount("a"_sr, 0, 1 * costMultiplier); - { - auto const busiestTags = topTags.getBusiestTags(1.0, 1 * costMultiplier); - ASSERT_EQ(busiestTags.size(), 1); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }), - 1); - } - topTags.incrementCount("b"_sr, 0, 2 * costMultiplier); - topTags.incrementCount("c"_sr, 0, 3 * costMultiplier); - { - auto busiestTags = topTags.getBusiestTags(1.0, 6 * costMultiplier); - ASSERT_EQ(busiestTags.size(), 2); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }), - 0); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }), - 1); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }), - 1); - } - topTags.incrementCount("a"_sr, 1 * costMultiplier, 3 * costMultiplier); - { - auto busiestTags = topTags.getBusiestTags(1.0, 9 * costMultiplier); - ASSERT_EQ(busiestTags.size(), 2); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }), - 1); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }), - 0); - ASSERT_EQ(std::count_if(busiestTags.begin(), - busiestTags.end(), - [](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }), - 1); - } - topTags.clear(); - ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0); - return Void(); -} diff --git a/fdbserver/include/fdbserver/Ratekeeper.h b/fdbserver/include/fdbserver/Ratekeeper.h index 1a5237666e8..a80355a361f 100644 --- a/fdbserver/include/fdbserver/Ratekeeper.h +++ b/fdbserver/include/fdbserver/Ratekeeper.h @@ -72,7 +72,7 @@ class StorageQueueInfo { StorageQueuingMetricsReply lastReply; bool acceptingRequests; limitReason_t limitReason; - std::vector busiestReadTags, busiestWriteTags; + std::vector busiestReadTags, busiestWriteTags; StorageQueueInfo(const UID& id, const LocalityData& locality); StorageQueueInfo(const UID& rateKeeperID, const UID& id, const LocalityData& locality); diff --git a/fdbserver/include/fdbserver/TransactionTagCounter.h b/fdbserver/include/fdbserver/TransactionTagCounter.h index 6e2b424e6f1..476e7686317 100644 --- a/fdbserver/include/fdbserver/TransactionTagCounter.h +++ b/fdbserver/include/fdbserver/TransactionTagCounter.h @@ -28,7 +28,7 @@ class TransactionTagCounter { PImpl impl; public: - TransactionTagCounter(UID thisServerID); + TransactionTagCounter(UID thisServerID, int maxTagsTracked, double minRateTracked); ~TransactionTagCounter(); // Update counters tracking the busyness of each tag in the current interval @@ -38,5 +38,5 @@ class TransactionTagCounter { void startNewInterval(); // Returns the set of busiest tags as of the end of the last interval - std::vector const& getBusiestTags() const; + std::vector const& getBusiestTags() const; }; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 9754bd63c77..419131f3dbe 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -997,7 +997,7 @@ struct StorageServer : public IStorageMetricsService { std::unordered_map> moveInShards; bool shardAware; // True if the storage server is aware of the physical shards. - Future auditSSShardInfoActor; + std::unordered_map> auditTasks; // Histograms struct FetchKeysHistograms { @@ -1641,7 +1641,11 @@ struct StorageServer : public IStorageMetricsService { serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0), - lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), + lastDurableVersionEBrake(0), maxQueryQueue(0), + transactionTagCounter(ssi.id(), + /*maxTagsTracked=*/SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED, + /*minRateTracked=*/SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * + CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE), busiestWriteTagContext(ssi.id()), counters(this), storageServerSourceTLogIDEventHolder( makeReference(ssi.id().toString() + "/StorageServerSourceTLogID")), @@ -4912,6 +4916,10 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, state int validatedKeys = 0; state std::string error; state int64_t cumulatedValidatedKeysNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1)); + state int64_t remoteReadBytes = 0; + loop { try { std::vector>> fs; @@ -4957,6 +4965,7 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, } const GetKeyValuesReply &remote = reps[0].get(), local = reps[1].get(); + remoteReadBytes = remote.data.expectedSize(); Key lastKey = range.begin; auditState.range = range; @@ -5034,6 +5043,9 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, auditState)); } + + wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping + } catch (Error& e) { TraceEvent(SevWarn, "ValidateRangeAgainstServerFailed", data->thisServerID) .errorUnsuppressed(e) @@ -5341,9 +5353,15 @@ struct AuditGetServerKeysRes { Version readAtVersion; UID serverId; std::vector ownRanges; + int64_t readBytes; AuditGetServerKeysRes() = default; - AuditGetServerKeysRes(KeyRange completeRange, Version readAtVersion, UID serverId, std::vector ownRanges) - : completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges) {} + AuditGetServerKeysRes(KeyRange completeRange, + Version readAtVersion, + UID serverId, + std::vector ownRanges, + int64_t readBytes) + : completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges), + readBytes(readBytes) {} }; // Given an input server, get ranges within the input range via the input transaction @@ -5391,7 +5409,7 @@ ACTOR Future getThisServerKeysFromServerKeys(UID serverID .detail("ReadAtVersion", readAtVersion) .detail("CompleteRange", completeRange) .detail("ResultSize", ownRanges.size()); - res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges); + res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize()); } catch (Error& e) { TraceEvent(SevDebug, "AuditStorageGetThisServerKeysError", serverID) @@ -5406,12 +5424,15 @@ ACTOR Future getThisServerKeysFromServerKeys(UID serverID struct AuditGetKeyServersRes { KeyRange completeRange; Version readAtVersion; + int64_t readBytes; std::unordered_map> rangeOwnershipMap; AuditGetKeyServersRes() = default; AuditGetKeyServersRes(KeyRange completeRange, Version readAtVersion, - std::unordered_map> rangeOwnershipMap) - : completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap) {} + std::unordered_map> rangeOwnershipMap, + int64_t readBytes) + : completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap), + readBytes(readBytes) {} }; // Given an input server, get ranges within the input range via the input transaction @@ -5471,7 +5492,7 @@ ACTOR Future getShardMapFromKeyServers(UID auditServerId, .detail("AtVersion", readAtVersion) .detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount) .detail("TotalShardsCount", totalShardsCount); - res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges); + res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize()); } catch (Error& e) { TraceEvent(SevDebug, "AuditStorageGetThisServerKeysFromKeyServersError", auditServerId) @@ -5513,6 +5534,9 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto state int retryCount = 0; state int64_t cumulatedValidatedLocalShardsNum = 0; state int64_t cumulatedValidatedServerKeysNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1)); + state int64_t remoteReadBytes = 0; try { while (true) { @@ -5544,12 +5568,14 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto serverKeyCompleteRange = serverKeyRes.completeRange; serverKeyReadAtVersion = serverKeyRes.readAtVersion; ownRangesSeenByServerKey = serverKeyRes.ownRanges; + remoteReadBytes = serverKeyRes.readBytes; // We want to do transactional read at a version newer than data->version while (serverKeyReadAtVersion < localShardInfoReadAtVersion) { if (retryCount >= SERVER_KNOBS->AUDIT_RETRY_COUNT_MAX) { failureReason = "Read serverKeys retry count exceeds the max"; throw audit_storage_failed(); } + wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping retryCount++; wait(delay(0.5)); tr.reset(); @@ -5558,6 +5584,7 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto serverKeyCompleteRange = serverKeyRes.completeRange; serverKeyReadAtVersion = serverKeyRes.readAtVersion; ownRangesSeenByServerKey = serverKeyRes.ownRanges; + remoteReadBytes = serverKeyRes.readBytes; } // retry until serverKeyReadAtVersion is as larger as localShardInfoReadAtVersion ASSERT(serverKeyReadAtVersion >= localShardInfoReadAtVersion); try { @@ -5681,6 +5708,8 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto break; } } + + wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping } } catch (Error& e) { TraceEvent(SevInfo, "AuditStorageSsShardFailed", data->thisServerID) @@ -5734,6 +5763,9 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora state KeyRangeRef rangeToRead; state int64_t cumulatedValidatedServerKeysNum = 0; state int64_t cumulatedValidatedKeyServersNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1)); + state int64_t remoteReadBytes = 0; try { while (true) { @@ -5744,6 +5776,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora mapFromKeyServers.clear(); serverKeyResMap.clear(); mapFromKeyServersRaw.clear(); + remoteReadBytes = 0; rangeToRead = KeyRangeRef(rangeToReadBegin, req.range.end); tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); @@ -5752,6 +5785,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora completeRangeByKeyServer = keyServerRes.completeRange; readAtVersion = keyServerRes.readAtVersion; mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap; + remoteReadBytes += keyServerRes.readBytes; // Use ssid of mapFromKeyServersRaw to read ServerKeys for (auto& [ssid, _] : mapFromKeyServersRaw) { actors.push_back(store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead))); @@ -5771,6 +5805,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora ASSERT(!overlappingRange.empty()); claimRange = overlappingRange; ASSERT(readAtVersion == serverKeyRes.readAtVersion); + remoteReadBytes += serverKeyRes.readBytes; } // Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare int64_t numValidatedServerKeys = 0; @@ -5915,6 +5950,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora break; } } + wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping } } catch (Error& e) { @@ -9439,6 +9475,8 @@ ACTOR Future cleanUpMoveInShard(StorageServer* data, Version version, Move } wait(data->durableVersion.whenAtLeast(mLV.version + 1)); + data->moveInShards.erase(moveInShard->id()); + return Void(); } @@ -13514,20 +13552,43 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->actors.add(fetchCheckpointKeyValuesQ(self, req)); } when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) { - // A SS can run one ValidateStorageServerShard at a time - // We do not have this limitation on other audit types - if (req.getType() == AuditType::ValidateStorageServerShard) { - if (self->auditSSShardInfoActor.isValid() && !self->auditSSShardInfoActor.isReady()) { - TraceEvent(SevWarn, "ExistRunningAuditStorageForServerShard") - .detail("NewAuditId", req.id) - .detail("NewAuditType", req.getType()); - self->auditSSShardInfoActor.cancel(); - } // New audit immediately starts and existing one gets cancelled - self->auditSSShardInfoActor = auditStorageStorageServerShardQ(self, req); + // Check existing audit task states + if (self->auditTasks.contains(req.getType())) { + if (req.id != self->auditTasks[req.getType()].first) { + // Any task of past audit must be ready + if (!self->auditTasks[req.getType()].second.getResult().isReady()) { + req.reply.sendError(audit_storage_exceeded_request_limit()); + TraceEvent(SevWarnAlways, "ExistSSAuditWithDifferentId") // unexpected + .detail("NewAuditId", req.id) + .detail("NewAuditType", req.getType()); + continue; + } + } else if (req.getType() == AuditType::ValidateStorageServerShard && + !self->auditTasks[req.getType()].second.getResult().isReady()) { + // Only one ValidateStorageServerShard is allowed to run at a time + TraceEvent(SevWarn, "ExistSSAuditForServerShardWithSameId") + .detail("AuditId", req.id) + .detail("AuditType", req.getType()); + self->auditTasks[req.getType()].second.clear(true); + } + } + // Prepare for the new audit task + if (!self->auditTasks.contains(req.getType()) || + self->auditTasks[req.getType()].second.getResult().isReady()) { + ASSERT(req.id.isValid()); + self->auditTasks[req.getType()] = std::make_pair(req.id, ActorCollection(true)); + } + // Start the new audit task + if (req.getType() == AuditType::ValidateHA) { + self->auditTasks[req.getType()].second.add(auditStorageQ(self, req)); + } else if (req.getType() == AuditType::ValidateReplica) { + self->auditTasks[req.getType()].second.add(auditStorageQ(self, req)); } else if (req.getType() == AuditType::ValidateLocationMetadata) { - self->actors.add(auditStorageLocationMetadataQ(self, req)); + self->auditTasks[req.getType()].second.add(auditStorageLocationMetadataQ(self, req)); + } else if (req.getType() == AuditType::ValidateStorageServerShard) { + self->auditTasks[req.getType()].second.add(auditStorageStorageServerShardQ(self, req)); } else { - self->actors.add(auditStorageQ(self, req)); + req.reply.sendError(not_implemented()); } } when(wait(updateProcessStatsTimer)) { @@ -13890,6 +13951,8 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, self.ssLock->halt(); + self.moveInShards.clear(); + state Error err = e; if (storageServerTerminated(self, persistentData, err)) { ssCore.cancel(); diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp index 287d5426396..05f82a72370 100644 --- a/fdbserver/workloads/PhysicalShardMove.actor.cpp +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -101,7 +101,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload { newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, EnablePhysicalShardMove::True), - // EnablePhysicalShardMove::False), KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), teamSize, includes, diff --git a/fdbserver/workloads/RestoreMultiRanges.actor.cpp b/fdbserver/workloads/RestoreMultiRanges.actor.cpp new file mode 100644 index 00000000000..b4e7e0ff5b4 --- /dev/null +++ b/fdbserver/workloads/RestoreMultiRanges.actor.cpp @@ -0,0 +1,172 @@ +/* + * RestoreMultiRanges.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/FDBTypes.h" +#include "fdbclient/ReadYourWrites.h" +#include "fdbrpc/simulator.h" +#include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/BackupContainer.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct RestoreMultiRangesWorkload : TestWorkload { + + FileBackupAgent backupAgent; + Reference backupContainer; + + RestoreMultiRangesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {} + + static constexpr const char* NAME = "RestoreMultiRanges"; + + ACTOR static Future clearDatabase(Database cx) { + state Transaction tr(cx); + loop { + try { + tr.clear(normalKeys); + wait(tr.commit()); + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + + ACTOR static Future prepareDatabase(Database cx) { + state Transaction tr(cx); + loop { + try { + tr.reset(); + tr.set("a"_sr, "a"_sr); + tr.set("aaaa"_sr, "aaaa"_sr); + tr.set("b"_sr, "b"_sr); + tr.set("bb"_sr, "bb"_sr); + tr.set("bbb"_sr, "bbb"_sr); + wait(tr.commit()); + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + + static void logTestData(const VectorRef& data) { + TraceEvent("TestFailureDetail").log(); + int index = 0; + for (auto& entry : data) { + TraceEvent("CurrentDataEntry") + .detail("Index", index) + .detail("Key", entry.key.toString()) + .detail("Value", entry.value.toString()); + index++; + } + } + + ACTOR static Future verifyDatabase(Database cx) { + state UID randomID = nondeterministicRandom()->randomUniqueID(); + TraceEvent("RestoreMultiRanges_Verify").detail("UID", randomID); + state Transaction tr(cx); + state KeyRangeRef range("a"_sr, "z"_sr); + loop { + try { + tr.reset(); + tr.debugTransaction(randomID); + RangeResult kvs = wait(tr.getRange(range, 10)); + if (kvs.size() != 4) { + logTestData(kvs); + TraceEvent(SevError, "TestFailureInfo") + .detail("DataSize", kvs.size()) + .detail("Expect", 4) + .detail("Workload", NAME); + return false; + } + KeyRef keys[4] = { "a"_sr, "aaaa"_sr, "bb"_sr, "bbb"_sr }; + for (size_t i = 0; i < 4; ++i) { + if (kvs[i].key != keys[i]) { + TraceEvent(SevError, "TestFailureInfo") + .detail("ExpectKey", keys[i]) + .detail("Got", kvs[i].key) + .detail("Index", i); + return false; + } + } + TraceEvent("RestoreMultiRanges_VerifyPassed"); + return true; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + + ACTOR static Future _start(RestoreMultiRangesWorkload* self, Database cx) { + TraceEvent("RestoreMultiRanges_StartBackup"); + wait(clearDatabase(cx)); + wait(prepareDatabase(cx)); + + state std::string backupContainer = "file://simfdb/backups/"; + state std::string tagName = "default"; + state Standalone> backupRanges; + backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef("a"_sr, "z"_sr)); + TraceEvent("RestoreMultiRanges_SubmitBackup"); + try { + wait(self->backupAgent.submitBackup(cx, + StringRef(backupContainer), + {}, + deterministicRandom()->randomInt(0, 60), + deterministicRandom()->randomInt(0, 100), + tagName, + backupRanges, + true, + StopWhenDone::True)); + } catch (Error& e) { + if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) + throw; + } + + TraceEvent("RestoreMultiRanges_WaitBackup"); + state Reference container; + wait(success(self->backupAgent.waitBackup(cx, tagName, StopWhenDone::True, &container))); + + TraceEvent("RestoreMultiRanges_ClearDatabase"); + wait(clearDatabase(cx)); + + TraceEvent("RestoreMultiRanges_Restore"); + state Standalone> ranges; + ranges.push_back_deep(ranges.arena(), KeyRangeRef("a"_sr, "aaaaa"_sr)); + ranges.push_back_deep(ranges.arena(), KeyRangeRef("bb"_sr, "bbbbb"_sr)); // Skip "b" + wait(success(self->backupAgent.restore(cx, + cx, + Key(tagName), + Key(container->getURL()), + {}, + ranges, + WaitForComplete::True, + ::invalidVersion, + Verbose::True))); + TraceEvent("RestoreMultiRanges_Success"); + return Void(); + } + + Future setup(Database const& cx) override { return Void(); } + Future start(Database const& cx) override { return clientId ? Void() : _start(this, cx); } + Future check(Database const& cx) override { return verifyDatabase(cx); } + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory RestoreMultiRangesWorkloadFactory; diff --git a/fdbserver/workloads/ValidateStorage.actor.cpp b/fdbserver/workloads/ValidateStorage.actor.cpp index 7716b01765e..c42ac3d4f5d 100644 --- a/fdbserver/workloads/ValidateStorage.actor.cpp +++ b/fdbserver/workloads/ValidateStorage.actor.cpp @@ -207,6 +207,10 @@ struct ValidateStorage : TestWorkload { .detail("AuditIDA", auditIdA) .detail("AuditIDB", auditIdB); } + std::vector res = wait(getAuditStates(cx, type, /*newFirst=*/true, 1)); + if (res.size() != 1) { + TraceEvent(SevError, "TestGetAuditStatesError").detail("ActualResSize", res.size()); + } return Void(); } diff --git a/packaging/docker/samples/local/README.md b/packaging/docker/samples/local/README.md index f7f5b3e979f..771cd08dbcb 100644 --- a/packaging/docker/samples/local/README.md +++ b/packaging/docker/samples/local/README.md @@ -7,7 +7,7 @@ the server process to its host machine. This depends on having the FoundationDB client installed on your host machine to work properly. This can be done using one of the client packages available -on our [Download](https://www.foundationdb.org/download/) page. The startup +from our [GitHub Releases](https://github.com/apple/foundationdb/releases). The startup scripts included here depend on `fdbcli` from one of those packages, and any client that wishes to connect will need a copy of the FoundationDB native client in addition to its binding of choice. Both the CLI and the native client diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6d0b5d4e1de..4d511cd077b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -31,7 +31,7 @@ if(WITH_PYTHON) message(WARNING "\ No old fdbserver binary found - using ${fdbserver_location} \ - It is recommended to install the current stable version from https://www.foundationdb.org/download/ \ + It is recommended to install the current stable version from https://github.com/apple/foundationdb/releases \ Or provide a path to another fdbserver") endif() @@ -272,6 +272,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/RYWDisable.toml) add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml) add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml) + add_fdb_test(TEST_FILES rare/RestoreMultiRanges.toml) add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml) add_fdb_test(TEST_FILES rare/StorageQuotaTest.toml) add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml) diff --git a/tests/rare/RestoreMultiRanges.toml b/tests/rare/RestoreMultiRanges.toml new file mode 100644 index 00000000000..befdfa02ead --- /dev/null +++ b/tests/rare/RestoreMultiRanges.toml @@ -0,0 +1,11 @@ +[configuration] +tenantModes = ['disabled'] + +[[test]] +testTitle = 'RestoreMultiRanges' +clearAfterTest = true +simBackupAgents = 'BackupToFile' + + [[test.workload]] + testName = 'RestoreMultiRanges' +