Skip to content

Commit

Permalink
chore: schedule chains (#3819)
Browse files Browse the repository at this point in the history
Use intrusive queue that allows batching of scheduling calls instead of handling each call separately.
This optimizations improves latency and throughput by 3-5%
In addition, we expose batching statistics in info transaction block.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Oct 11, 2024
1 parent e71f083 commit 5d2c308
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 26 deletions.
19 changes: 12 additions & 7 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,20 @@ ShardId Shard(string_view v, ShardId shard_num) {
}

EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
static_assert(sizeof(Stats) == 48);
static_assert(sizeof(Stats) == 64);

defrag_attempt_total += o.defrag_attempt_total;
defrag_realloc_total += o.defrag_realloc_total;
defrag_task_invocation_total += o.defrag_task_invocation_total;
poll_execution_total += o.poll_execution_total;
tx_ooo_total += o.tx_ooo_total;
tx_optimistic_total += o.tx_optimistic_total;
#define ADD(x) x += o.x

ADD(defrag_attempt_total);
ADD(defrag_realloc_total);
ADD(defrag_task_invocation_total);
ADD(poll_execution_total);
ADD(tx_ooo_total);
ADD(tx_optimistic_total);
ADD(tx_batch_schedule_calls_total);
ADD(tx_batch_scheduled_items_total);

#undef ADD
return *this;
}

Expand Down
6 changes: 6 additions & 0 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ class EngineShard {
uint64_t tx_optimistic_total = 0;
uint64_t tx_ooo_total = 0;

// Number of ScheduleBatchInShard calls.
uint64_t tx_batch_schedule_calls_total = 0;

// Number of transactions scheduled via ScheduleBatchInShard.
uint64_t tx_batch_scheduled_items_total = 0;

Stats& operator+=(const Stats&);
};

Expand Down
3 changes: 3 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
server_family_.GetDflyCmd()->BreakStalledFlowsInShard();
server_family_.UpdateMemoryGlobalStats();
});
Transaction::Init(shard_num);

SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio));
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));

Expand Down Expand Up @@ -1010,6 +1012,7 @@ void Service::Shutdown() {
shard_set->PreShutdown();
namespaces.Clear();
shard_set->Shutdown();
Transaction::Shutdown();

pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });

Expand Down
3 changes: 2 additions & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2440,7 +2440,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tx_normal_total", m.coordinator_stats.tx_normal_cnt);
append("tx_inline_runs_total", m.coordinator_stats.tx_inline_runs);
append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt);

append("tx_batch_scheduled_items_total", m.shard_stats.tx_batch_scheduled_items_total);
append("tx_batch_schedule_calls_total", m.shard_stats.tx_batch_schedule_calls_total);
append("tx_with_freq", absl::StrJoin(m.coordinator_stats.tx_width_freq_arr, ","));
append("tx_queue_len", m.tx_queue_len);

Expand Down
119 changes: 101 additions & 18 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <absl/strings/match.h>

#include <new>

#include "base/flags.h"
#include "base/logging.h"
#include "facade/op_status.h"
Expand Down Expand Up @@ -86,12 +88,32 @@ uint16_t trans_id(const Transaction* ptr) {
struct ScheduleContext {
Transaction* trans;
bool optimistic_execution = false;

std::atomic<ScheduleContext*> next{nullptr};

std::atomic_uint32_t fail_cnt{0};

ScheduleContext(Transaction* t, bool optimistic) : trans(t), optimistic_execution(optimistic) {
}
};

constexpr size_t kAvoidFalseSharingSize = 64;
struct ScheduleQ {
alignas(kAvoidFalseSharingSize) base::MPSCIntrusiveQueue<ScheduleContext> queue;
alignas(kAvoidFalseSharingSize) atomic_bool armed{false};
};

void MPSC_intrusive_store_next(ScheduleContext* dest, ScheduleContext* next_node) {
dest->next.store(next_node, std::memory_order_relaxed);
}

ScheduleContext* MPSC_intrusive_load_next(const ScheduleContext& src) {
return src.next.load(std::memory_order_acquire);
}

// of shard_num arity.
ScheduleQ* schedule_queues = nullptr;

} // namespace

bool Transaction::BatonBarrier::IsClaimed() const {
Expand Down Expand Up @@ -139,6 +161,17 @@ Transaction::Guard::~Guard() {
tx->Refurbish();
}

void Transaction::Init(unsigned num_shards) {
DCHECK(schedule_queues == nullptr);
schedule_queues = new ScheduleQ[num_shards];
}

void Transaction::Shutdown() {
DCHECK(schedule_queues);
delete[] schedule_queues;
schedule_queues = nullptr;
}

Transaction::Transaction(const CommandId* cid) : cid_{cid} {
InitTxTime();
string_view cmd_name(cid_->name());
Expand Down Expand Up @@ -685,11 +718,11 @@ void Transaction::ScheduleInternal() {
// Try running immediately (during scheduling) if we're concluding and either:
// - have a single shard, and thus never have to cancel scheduling due to reordering
// - run as an idempotent command, meaning we can safely repeat the operation if scheduling fails
bool can_run_immediately = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) &&
(unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT));
bool optimistic_exec = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) &&
(unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT));

DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards "
<< " immediate run: " << can_run_immediately;
<< " optimistic_execution: " << optimistic_exec;

auto is_active = [this](uint32_t i) { return IsActive(i); };

Expand All @@ -711,29 +744,40 @@ void Transaction::ScheduleInternal() {
// in the lower-level code. It's not really needed otherwise because we run inline.

// single shard schedule operation can't fail
CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately));
CHECK(ScheduleInShard(EngineShard::tlocal(), optimistic_exec));
run_barrier_.Dec();
break;
}

ScheduleContext schedule_ctx{this, can_run_immediately};
ScheduleContext schedule_ctx{this, optimistic_exec};

auto cb = [&schedule_ctx]() {
if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(),
schedule_ctx.optimistic_execution)) {
schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed);
if (unique_shard_cnt_ == 1) {
// Single shard optimization. Note: we could apply the same optimization
// to multi-shard transactions as well by creating a vector of ScheduleContext.
schedule_queues[unique_shard_id_].queue.Push(&schedule_ctx);
bool current_val = false;
if (schedule_queues[unique_shard_id_].armed.compare_exchange_strong(current_val, true,
memory_order_acq_rel)) {
shard_set->Add(unique_shard_id_, &Transaction::ScheduleBatchInShard);
}
schedule_ctx.trans->FinishHop();
};
} else {
auto cb = [&schedule_ctx]() {
if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(),
schedule_ctx.optimistic_execution)) {
schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed);
}
schedule_ctx.trans->FinishHop();
};

IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });
IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });

// Add this debugging function to print more information when we experience deadlock
// during tests.
ThisFiber::PrintLocalsCallback locals([&] {
return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_,
" run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n");
});
// Add this debugging function to print more information when we experience deadlock
// during tests.
ThisFiber::PrintLocalsCallback locals([&] {
return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_,
" run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n");
});
}
run_barrier_.Wait();

if (schedule_ctx.fail_cnt.load(memory_order_relaxed) == 0) {
Expand Down Expand Up @@ -1115,6 +1159,45 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) {
return true;
}

void Transaction::ScheduleBatchInShard() {
EngineShard* shard = EngineShard::tlocal();
auto& stats = shard->stats();
stats.tx_batch_schedule_calls_total++;

ShardId sid = shard->shard_id();
auto& sq = schedule_queues[sid];

for (unsigned j = 0;; ++j) {
// We pull the items from the queue in a loop until we reach the stop condition.
// TODO: we may have fairness problem here, where transactions being added up all the time
// and we never break from the loop. It is possible to break early but it's not trivial
// because we must ensure that there is another ScheduleBatchInShard callback in the queue.
// Can be checked with testing sq.armed is true when j == 1.
while (true) {
ScheduleContext* item = sq.queue.Pop();
if (!item)
break;

if (!item->trans->ScheduleInShard(shard, item->optimistic_execution)) {
item->fail_cnt.fetch_add(1, memory_order_relaxed);
}
item->trans->FinishHop();
stats.tx_batch_scheduled_items_total++;
};

// j==1 means we already signalled that we're done with the current batch.
if (j == 1)
break;

// We signal that we're done with the current batch but then we check if there are more
// transactions to fetch in the next iteration.
// We do this to avoid the situation where we have a data race, where
// a transaction is added to the queue, we've checked that sq.armed is true and skipped
// adding the callback that fetches the transaction.
sq.armed.store(false, memory_order_release);
}
}

bool Transaction::CancelShardCb(EngineShard* shard) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
Expand Down
6 changes: 6 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ class Transaction {
Transaction* tx;
};

static void Init(unsigned num_shards);
static void Shutdown();

explicit Transaction(const CommandId* cid);

// Initialize transaction for squashing placed on a specific shard with a given parent tx
Expand Down Expand Up @@ -515,6 +518,9 @@ class Transaction {
// subject to uncontended keys.
bool ScheduleInShard(EngineShard* shard, bool execute_optimistic);

// Optimized extension of ScheduleInShard. Pulls several transactions queued for scheduling.
static void ScheduleBatchInShard();

// Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier
void DispatchHop();

Expand Down

0 comments on commit 5d2c308

Please sign in to comment.