Skip to content

Commit

Permalink
KeyValueStoreRocksDB histograms to track latencies
Browse files Browse the repository at this point in the history
  • Loading branch information
neethuhaneesha committed Dec 15, 2021
1 parent 7697460 commit eec774b
Showing 1 changed file with 101 additions and 4 deletions.
105 changes: 101 additions & 4 deletions fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "flow/flow.h"
#include "flow/IThreadPool.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/Histogram.h"

#include <memory>
#include <tuple>
Expand All @@ -35,6 +36,20 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 :

namespace {

const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage");
const StringRef ROCKSDB_COMMIT_ACTION_HISTOGRAM = LiteralStringRef("RocksDBCommitAction");
const StringRef ROCKSDB_WRITE_HISTOGRAM = LiteralStringRef("RocksDBWrite");
const StringRef ROCKSDB_DELETE_COMPACTRANGE_HISTOGRAM = LiteralStringRef("RocksDBDeleteCompactRange");
const StringRef ROCKSDB_READRANGE_ACTION_HISTOGRAM = LiteralStringRef("RocksDBReadRangeAction");
const StringRef ROCKSDB_READVALUE_ACTION_HISTOGRAM = LiteralStringRef("RocksDBReadValueAction");
const StringRef ROCKSDB_READPREFIX_ACTION_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixAction");
const StringRef ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBReadRangeQueueWait");
const StringRef ROCKSDB_READVALUE_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBReadValueQueueWait");
const StringRef ROCKSDB_READPREFIX_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixQueueWait");
const StringRef ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM = LiteralStringRef("RocksDBReadRangeNewIterator");
const StringRef ROCKSDB_READVALUE_GET_HISTOGRAM = LiteralStringRef("RocksDBReadValueGet");
const StringRef ROCKSDB_READPREFIX_GET_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixGet");

rocksdb::Slice toSlice(StringRef s) {
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
}
Expand Down Expand Up @@ -231,8 +246,20 @@ struct RocksDBKeyValueStore : IKeyValueStore {
struct Writer : IThreadPoolReceiver {
DB& db;
UID id;

explicit Writer(DB& db, UID id) : db(db), id(id) {}
Reference<Histogram> commitActionHistogram;
Reference<Histogram> writeHistogram;
Reference<Histogram> deleteCompactRangeHistogram;

explicit Writer(DB& db, UID id)
: db(db), id(id), commitActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_COMMIT_ACTION_HISTOGRAM,
Histogram::Unit::microseconds)),
writeHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_WRITE_HISTOGRAM,
Histogram::Unit::microseconds)),
deleteCompactRangeHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_DELETE_COMPACTRANGE_HISTOGRAM,
Histogram::Unit::microseconds)) {}

~Writer() override {
if (db) {
Expand Down Expand Up @@ -304,25 +331,34 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
void action(CommitAction& a) {
double commitBeginTime = now();
Standalone<VectorRef<KeyRangeRef>> deletes;
DeleteVisitor dv(deletes, deletes.arena());
ASSERT(a.batchToCommit->Iterate(&dv).ok());
// If there are any range deletes, we should have added them to be deleted.
ASSERT(!deletes.empty() || !a.batchToCommit->HasDeleteRange());
rocksdb::WriteOptions options;
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;

double writeBeginTime = now();
auto s = db->Write(options, a.batchToCommit.get());
writeHistogram->sampleSeconds(now() - writeBeginTime);

if (!s.ok()) {
logRocksDBError(s, "Commit");
a.done.sendError(statusToError(s));
} else {
a.done.send(Void());

double compactRangeBeginTime = now();
for (const auto& keyRange : deletes) {
auto begin = toSlice(keyRange.begin);
auto end = toSlice(keyRange.end);
ASSERT(db->SuggestCompactRange(db->DefaultColumnFamily(), &begin, &end).ok());
}
deleteCompactRangeHistogram->sampleSeconds(now() - compactRangeBeginTime);
}
commitActionHistogram->sampleSeconds(now() - commitBeginTime);
}

struct CloseAction : TypedAction<Writer, CloseAction> {
Expand Down Expand Up @@ -361,8 +397,44 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double readValueTimeout;
double readValuePrefixTimeout;
double readRangeTimeout;

explicit Reader(DB& db) : db(db) {
Reference<Histogram> readRangeActionHistogram;
Reference<Histogram> readValueActionHistogram;
Reference<Histogram> readPrefixActionHistogram;
Reference<Histogram> readRangeQueueWaitHistogram;
Reference<Histogram> readValueQueueWaitHistogram;
Reference<Histogram> readPrefixQueueWaitHistogram;
Reference<Histogram> readRangeNewIteratorHistogram;
Reference<Histogram> readValueGetHistogram;
Reference<Histogram> readPrefixGetHistogram;

explicit Reader(DB& db)
: db(db), readRangeActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READRANGE_ACTION_HISTOGRAM,
Histogram::Unit::microseconds)),
readValueActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READVALUE_ACTION_HISTOGRAM,
Histogram::Unit::microseconds)),
readPrefixActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READPREFIX_ACTION_HISTOGRAM,
Histogram::Unit::microseconds)),
readRangeQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM,
Histogram::Unit::microseconds)),
readValueQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READVALUE_QUEUEWAIT_HISTOGRAM,
Histogram::Unit::microseconds)),
readPrefixQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READPREFIX_QUEUEWAIT_HISTOGRAM,
Histogram::Unit::microseconds)),
readRangeNewIteratorHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM,
Histogram::Unit::microseconds)),
readValueGetHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READVALUE_GET_HISTOGRAM,
Histogram::Unit::microseconds)),
readPrefixGetHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READPREFIX_GET_HISTOGRAM,
Histogram::Unit::microseconds)) {
if (g_network->isSimulated()) {
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
// very high load and single read thread cannot process all the load within the timeouts.
Expand All @@ -388,6 +460,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValueAction& a) {
readValueQueueWaitHistogram->sampleSeconds(now() - a.startTime);
Optional<TraceBatch> traceBatch;
if (a.debugID.present()) {
traceBatch = { TraceBatch{} };
Expand All @@ -401,13 +474,18 @@ struct RocksDBKeyValueStore : IKeyValueStore {
a.result.sendError(transaction_too_old());
return;
}
double readBeginTime = now();
rocksdb::PinnableSlice value;
auto options = getReadOptions();
uint64_t deadlineMircos =
db->GetEnv()->NowMicros() + (readValueTimeout - (timer_monotonic() - a.startTime)) * 1000000;
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);

double dbGetBeginTime = now();
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
readValueGetHistogram->sampleSeconds(now() - dbGetBeginTime);

if (a.debugID.present()) {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
traceBatch.get().dump();
Expand All @@ -420,6 +498,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
logRocksDBError(s, "ReadValue");
a.result.sendError(statusToError(s));
}
readValueActionHistogram->sampleSeconds(now() - readBeginTime);
}

struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
Expand All @@ -433,6 +512,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValuePrefixAction& a) {
readPrefixQueueWaitHistogram->sampleSeconds(now() - a.startTime);
Optional<TraceBatch> traceBatch;
if (a.debugID.present()) {
traceBatch = { TraceBatch{} };
Expand All @@ -448,13 +528,18 @@ struct RocksDBKeyValueStore : IKeyValueStore {
a.result.sendError(transaction_too_old());
return;
}
double readBeginTime = now();
rocksdb::PinnableSlice value;
auto options = getReadOptions();
uint64_t deadlineMircos =
db->GetEnv()->NowMicros() + (readValuePrefixTimeout - (timer_monotonic() - a.startTime)) * 1000000;
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);

double dbGetBeginTime = now();
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
readPrefixGetHistogram->sampleSeconds(now() - dbGetBeginTime);

if (a.debugID.present()) {
traceBatch.get().addEvent("GetValuePrefixDebug",
a.debugID.get().first(),
Expand All @@ -470,6 +555,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
logRocksDBError(s, "ReadValuePrefix");
a.result.sendError(statusToError(s));
}
readPrefixActionHistogram->sampleSeconds(now() - readBeginTime);
}

struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
Expand All @@ -482,6 +568,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
readRangeQueueWaitHistogram->sampleSeconds(now() - a.startTime);
if (timer_monotonic() - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "RocksDBError")
.detail("Error", "Read range request timedout")
Expand All @@ -491,6 +578,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return;
}

double readBeginTime = now();
RangeResult result;
if (a.rowLimit == 0 || a.byteLimit == 0) {
a.result.send(result);
Expand All @@ -508,7 +596,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (a.rowLimit >= 0) {
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;

double iterCreationBeginTime = now();
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
readRangeNewIteratorHistogram->sampleSeconds(now() - iterCreationBeginTime);

cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
Expand All @@ -532,7 +624,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
} else {
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;

double iterCreationBeginTime = now();
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
readRangeNewIteratorHistogram->sampleSeconds(now() - iterCreationBeginTime);

cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev();
Expand Down Expand Up @@ -569,6 +665,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
readRangeActionHistogram->sampleSeconds(now() - readBeginTime);
}
};

Expand Down

0 comments on commit eec774b

Please sign in to comment.