Skip to content

Commit

Permalink
refactor(replication): Reduce memory copying during incremental synch…
Browse files Browse the repository at this point in the history
…ronization

Optimized the logic for handling Psync incremental data on replica nodes, reducing an unnecessary data copy and lowering the loop complexity in the corresponding logic.
  • Loading branch information
RiversJin committed Dec 11, 2024
1 parent 698c3d4 commit 6b6eedf
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 32 deletions.
55 changes: 31 additions & 24 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
#include <atomic>
#include <csignal>
#include <future>
#include <memory>
#include <string>
#include <string_view>
#include <thread>

#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "scope_exit.h"
#include "server/redis_reply.h"
Expand Down Expand Up @@ -557,7 +560,6 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
}

ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *bev) {
char *bulk_data = nullptr;
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
Expand All @@ -576,31 +578,37 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}
case Incr_batch_data:
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not enough
return CBState::AGAIN;
}

const char* bulk_data = reinterpret_cast<const char*>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;

if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
if (bulk_string != "ping") {
auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, incr_bulk_len_));
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(bulk_string);
return CBState::RESTART;
}

s = parseWriteBatch(bulk_string);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(bulk_string)
<< ": " << s.Msg();
return CBState::RESTART;
}
}
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;
} else {
return CBState::AGAIN;
}

rocksdb::WriteBatch batch(std::move(bulk_string));

auto s = storage_->ReplicaApplyWriteBatch(&batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(batch.Data());
return CBState::RESTART;
}

s = parseWriteBatch(batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(batch.Data())
<< ": " << s.Msg();
return CBState::RESTART;
}

break;
}
}
Expand Down Expand Up @@ -981,8 +989,7 @@ void ReplicationThread::TimerCB(int, int16_t) {
}
}

Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch& write_batch) {
WriteBatchHandler write_batch_handler;

auto db_status = write_batch.Iterate(&write_batch_handler);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "event_util.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -209,7 +210,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
static bool isWrongPsyncNum(std::string_view err);
static bool isUnknownOption(std::string_view err);

Status parseWriteBatch(const std::string &batch_string);
Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
};

/*
Expand Down
16 changes: 11 additions & 5 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "redis_db.h"
#include "redis_metadata.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "server/server.h"
#include "storage/batch_indexer.h"
Expand Down Expand Up @@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write
return Write(ctx, options, batch->GetWriteBatch());
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
return applyWriteBatch(default_write_opts_, batch);
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch* batch){
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
auto s = db_->Write(options, &batch);
auto s = db_->Write(options, batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
return applyWriteBatch(options, &batch);
}

void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
Expand Down
6 changes: 4 additions & 2 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "config/config.h"
#include "lock_manager.h"
#include "observer_or_unique.h"
#include "rocksdb/write_batch.h"
#include "status.h"

#if defined(__sparc__) || defined(__arm__)
Expand Down Expand Up @@ -230,7 +231,7 @@ class Storage {
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

Expand Down Expand Up @@ -380,13 +381,14 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;

rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
rocksdb::WriteOptions default_write_opts_;

// rocksdb used global block cache
std::shared_ptr<rocksdb::Cache> shared_block_cache_;

rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
Status applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch);
};

/// Context passes fixed snapshot and batch between APIs
Expand Down

0 comments on commit 6b6eedf

Please sign in to comment.