diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 7353cacd9e12..b7d25d5a2c92 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -210,15 +210,20 @@ ShardId Shard(string_view v, ShardId shard_num) { } EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - static_assert(sizeof(Stats) == 48); + static_assert(sizeof(Stats) == 64); - defrag_attempt_total += o.defrag_attempt_total; - defrag_realloc_total += o.defrag_realloc_total; - defrag_task_invocation_total += o.defrag_task_invocation_total; - poll_execution_total += o.poll_execution_total; - tx_ooo_total += o.tx_ooo_total; - tx_optimistic_total += o.tx_optimistic_total; +#define ADD(x) x += o.x + ADD(defrag_attempt_total); + ADD(defrag_realloc_total); + ADD(defrag_task_invocation_total); + ADD(poll_execution_total); + ADD(tx_ooo_total); + ADD(tx_optimistic_total); + ADD(tx_batch_schedule_calls_total); + ADD(tx_batch_scheduled_items_total); + +#undef ADD return *this; } diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 262914b2cb4b..0673fa5641f1 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -38,6 +38,12 @@ class EngineShard { uint64_t tx_optimistic_total = 0; uint64_t tx_ooo_total = 0; + // Number of ScheduleBatchInShard calls. + uint64_t tx_batch_schedule_calls_total = 0; + + // Number of transactions scheduled via ScheduleBatchInShard. + uint64_t tx_batch_scheduled_items_total = 0; + Stats& operator+=(const Stats&); }; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 7ea727f6c748..9d605f090542 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -979,6 +979,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector server_family_.GetDflyCmd()->BreakStalledFlowsInShard(); server_family_.UpdateMemoryGlobalStats(); }); + Transaction::Init(shard_num); + SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio)); SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio)); @@ -1010,6 +1012,7 @@ void Service::Shutdown() { shard_set->PreShutdown(); namespaces.Clear(); shard_set->Shutdown(); + Transaction::Shutdown(); pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); }); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b5a152b16205..401fe5cdd470 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2440,7 +2440,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("tx_normal_total", m.coordinator_stats.tx_normal_cnt); append("tx_inline_runs_total", m.coordinator_stats.tx_inline_runs); append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); - + append("tx_batch_scheduled_items_total", m.shard_stats.tx_batch_scheduled_items_total); + append("tx_batch_schedule_calls_total", m.shard_stats.tx_batch_schedule_calls_total); append("tx_with_freq", absl::StrJoin(m.coordinator_stats.tx_width_freq_arr, ",")); append("tx_queue_len", m.tx_queue_len); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 2fe35e740aa8..a358e4e66f2e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -6,6 +6,8 @@ #include +#include + #include "base/flags.h" #include "base/logging.h" #include "facade/op_status.h" @@ -86,12 +88,32 @@ uint16_t trans_id(const Transaction* ptr) { struct ScheduleContext { Transaction* trans; bool optimistic_execution = false; + + std::atomic next{nullptr}; + std::atomic_uint32_t fail_cnt{0}; ScheduleContext(Transaction* t, bool optimistic) : trans(t), optimistic_execution(optimistic) { } }; +constexpr size_t kAvoidFalseSharingSize = 64; +struct ScheduleQ { + alignas(kAvoidFalseSharingSize) base::MPSCIntrusiveQueue queue; + alignas(kAvoidFalseSharingSize) atomic_bool armed{false}; +}; + +void MPSC_intrusive_store_next(ScheduleContext* dest, ScheduleContext* next_node) { + dest->next.store(next_node, std::memory_order_relaxed); +} + +ScheduleContext* MPSC_intrusive_load_next(const ScheduleContext& src) { + return src.next.load(std::memory_order_acquire); +} + +// of shard_num arity. +ScheduleQ* schedule_queues = nullptr; + } // namespace bool Transaction::BatonBarrier::IsClaimed() const { @@ -139,6 +161,17 @@ Transaction::Guard::~Guard() { tx->Refurbish(); } +void Transaction::Init(unsigned num_shards) { + DCHECK(schedule_queues == nullptr); + schedule_queues = new ScheduleQ[num_shards]; +} + +void Transaction::Shutdown() { + DCHECK(schedule_queues); + delete[] schedule_queues; + schedule_queues = nullptr; +} + Transaction::Transaction(const CommandId* cid) : cid_{cid} { InitTxTime(); string_view cmd_name(cid_->name()); @@ -685,11 +718,11 @@ void Transaction::ScheduleInternal() { // Try running immediately (during scheduling) if we're concluding and either: // - have a single shard, and thus never have to cancel scheduling due to reordering // - run as an idempotent command, meaning we can safely repeat the operation if scheduling fails - bool can_run_immediately = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) && - (unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT)); + bool optimistic_exec = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) && + (unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT)); DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards " - << " immediate run: " << can_run_immediately; + << " optimistic_execution: " << optimistic_exec; auto is_active = [this](uint32_t i) { return IsActive(i); }; @@ -711,29 +744,40 @@ void Transaction::ScheduleInternal() { // in the lower-level code. It's not really needed otherwise because we run inline. // single shard schedule operation can't fail - CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately)); + CHECK(ScheduleInShard(EngineShard::tlocal(), optimistic_exec)); run_barrier_.Dec(); break; } - ScheduleContext schedule_ctx{this, can_run_immediately}; + ScheduleContext schedule_ctx{this, optimistic_exec}; - auto cb = [&schedule_ctx]() { - if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(), - schedule_ctx.optimistic_execution)) { - schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed); + if (unique_shard_cnt_ == 1) { + // Single shard optimization. Note: we could apply the same optimization + // to multi-shard transactions as well by creating a vector of ScheduleContext. + schedule_queues[unique_shard_id_].queue.Push(&schedule_ctx); + bool current_val = false; + if (schedule_queues[unique_shard_id_].armed.compare_exchange_strong(current_val, true, + memory_order_acq_rel)) { + shard_set->Add(unique_shard_id_, &Transaction::ScheduleBatchInShard); } - schedule_ctx.trans->FinishHop(); - }; + } else { + auto cb = [&schedule_ctx]() { + if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(), + schedule_ctx.optimistic_execution)) { + schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed); + } + schedule_ctx.trans->FinishHop(); + }; - IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); + IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); - // Add this debugging function to print more information when we experience deadlock - // during tests. - ThisFiber::PrintLocalsCallback locals([&] { - return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_, - " run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n"); - }); + // Add this debugging function to print more information when we experience deadlock + // during tests. + ThisFiber::PrintLocalsCallback locals([&] { + return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_, + " run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n"); + }); + } run_barrier_.Wait(); if (schedule_ctx.fail_cnt.load(memory_order_relaxed) == 0) { @@ -1115,6 +1159,45 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) { return true; } +void Transaction::ScheduleBatchInShard() { + EngineShard* shard = EngineShard::tlocal(); + auto& stats = shard->stats(); + stats.tx_batch_schedule_calls_total++; + + ShardId sid = shard->shard_id(); + auto& sq = schedule_queues[sid]; + + for (unsigned j = 0;; ++j) { + // We pull the items from the queue in a loop until we reach the stop condition. + // TODO: we may have fairness problem here, where transactions being added up all the time + // and we never break from the loop. It is possible to break early but it's not trivial + // because we must ensure that there is another ScheduleBatchInShard callback in the queue. + // Can be checked with testing sq.armed is true when j == 1. + while (true) { + ScheduleContext* item = sq.queue.Pop(); + if (!item) + break; + + if (!item->trans->ScheduleInShard(shard, item->optimistic_execution)) { + item->fail_cnt.fetch_add(1, memory_order_relaxed); + } + item->trans->FinishHop(); + stats.tx_batch_scheduled_items_total++; + }; + + // j==1 means we already signalled that we're done with the current batch. + if (j == 1) + break; + + // We signal that we're done with the current batch but then we check if there are more + // transactions to fetch in the next iteration. + // We do this to avoid the situation where we have a data race, where + // a transaction is added to the queue, we've checked that sq.armed is true and skipped + // adding the callback that fetches the transaction. + sq.armed.store(false, memory_order_release); + } +} + bool Transaction::CancelShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; diff --git a/src/server/transaction.h b/src/server/transaction.h index b15a022a6ad8..5e209eb96d98 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -178,6 +178,9 @@ class Transaction { Transaction* tx; }; + static void Init(unsigned num_shards); + static void Shutdown(); + explicit Transaction(const CommandId* cid); // Initialize transaction for squashing placed on a specific shard with a given parent tx @@ -515,6 +518,9 @@ class Transaction { // subject to uncontended keys. bool ScheduleInShard(EngineShard* shard, bool execute_optimistic); + // Optimized extension of ScheduleInShard. Pulls several transactions queued for scheduling. + static void ScheduleBatchInShard(); + // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier void DispatchHop();