Skip to content

Commit

Permalink
fix: Don't remove non-concluding tx from queue on ooo runs (#1427)
Browse files Browse the repository at this point in the history
* fix: Don't remove non-concluding tx from queue on ooo runs

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Jun 18, 2023
1 parent 7ec5bd9 commit 6d4d740
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 16 deletions.
15 changes: 11 additions & 4 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
DCHECK(continuation_trans_ == nullptr)
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId();

bool keep = trans->RunInShard(this);
bool keep = trans->RunInShard(this, false);
if (keep) {
return;
}
Expand All @@ -309,7 +309,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
trans = nullptr;

if (continuation_trans_->IsArmedInShard(sid)) {
bool to_keep = continuation_trans_->RunInShard(this);
bool to_keep = continuation_trans_->RunInShard(this, false);
DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
Expand Down Expand Up @@ -361,7 +361,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
dbg_id = head->DebugId();
}

bool keep = head->RunInShard(this);
bool keep = head->RunInShard(this, false);

// We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
Expand Down Expand Up @@ -390,7 +390,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
++stats_.ooo_runs;

bool keep = trans->RunInShard(this);
bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER;
bool keep = trans->RunInShard(this, txq_ooo);

// If the transaction concluded, it must remove itself from the tx queue.
// Otherwise it is required to stay there to keep the relative order.
if (txq_ooo && !trans->IsMulti())
DCHECK_EQ(keep, trans->GetLocalTxqPos(sid) != TxQueue::kEnd);

DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ string Transaction::DebugId() const {
}

// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
bool Transaction::RunInShard(EngineShard* shard) {
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK(cb_ptr_) << DebugId();
DCHECK_GT(txid_, 0u);
Expand Down Expand Up @@ -491,9 +491,11 @@ bool Transaction::RunInShard(EngineShard* shard) {
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);

// we remove tx from tx-queue upon first invocation.
// if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard.
if (sd.pq_pos != TxQueue::kEnd) {
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow).
bool remove_txq = is_concluding || !txq_ooo;
if (remove_txq && sd.pq_pos != TxQueue::kEnd) {
shard->txq()->Remove(sd.pq_pos);
sd.pq_pos = TxQueue::kEnd;
}
Expand Down Expand Up @@ -1152,6 +1154,7 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
bc->AddWatched(keys, this);

sd.local_mask |= SUSPENDED_Q;
sd.local_mask &= ~OUT_OF_ORDER;
DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask
<< ", first_key:" << keys.front();

Expand Down
19 changes: 12 additions & 7 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,11 @@ class Transaction {
// Can be used only for single key invocations, because it writes a into shared variable.
template <typename F> auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr));

// Called by EngineShard when performing Execute over the tx queue.
// Called by engine shard to execute a transaction hop.
// txq_ooo is set to true if the transaction is running out of order
// not as the tx queue head.
// Returns true if transaction should be kept in the queue.
bool RunInShard(EngineShard* shard);
bool RunInShard(EngineShard* shard, bool txq_ooo);

// Registers transaction into watched queue and blocks until a) either notification is received.
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
Expand Down Expand Up @@ -243,6 +245,10 @@ class Transaction {
return shard_data_[SidToId(sid)].local_mask;
}

uint32_t GetLocalTxqPos(ShardId sid) const {
return shard_data_[SidToId(sid)].pq_pos;
}

TxId txid() const {
return txid_;
}
Expand Down Expand Up @@ -365,11 +371,10 @@ class Transaction {
COORD_SCHED = 1,
COORD_EXEC = 2,

// We are running the last execution step in multi-hop operation.
COORD_EXEC_CONCLUDING = 4,
COORD_BLOCKED = 8,
COORD_CANCELLED = 0x10,
COORD_OOO = 0x20,
COORD_EXEC_CONCLUDING = 1 << 2, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 3,
COORD_CANCELLED = 1 << 4,
COORD_OOO = 1 << 5,
};

struct PerShardCache {
Expand Down
32 changes: 31 additions & 1 deletion tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
import pytest
import redis
import asyncio
from redis import asyncio as aioredis

from . import dfly_multi_test_args
from . import dfly_multi_test_args, dfly_args
from .utility import batch_fill_data, gen_test_data


Expand Down Expand Up @@ -48,3 +49,32 @@ async def test_password(df_local_factory, export_dfly_password):
client = aioredis.Redis(password=requirepass)
await client.ping()
dfly.stop()


"""
Make sure that multi-hop transactions can't run OOO.
"""

MULTI_HOPS = """
for i = 0, ARGV[1] do
redis.call('INCR', KEYS[1])
end
"""

@dfly_args({"proactor_threads": 1})
async def test_txq_ooo(async_client: aioredis.Redis, df_server):
async def task1(k, h):
c = aioredis.Redis(port=df_server.port)
for _ in range(100):
await c.eval(MULTI_HOPS, 1, k, h)

async def task2(k, n):
c = aioredis.Redis(port=df_server.port)
for _ in range(100):
pipe = c.pipeline(transaction=False)
pipe.lpush(k, 1)
for _ in range(n):
pipe.blpop(k, 0.001)
await pipe.execute()

await asyncio.gather(task1('i1', 2), task1('i2', 3), task2('l1', 2), task2('l1', 2), task2('l1', 5))

0 comments on commit 6d4d740

Please sign in to comment.