Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reduce memory copying during incremental synchronization #2689

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
#include <atomic>
#include <csignal>
#include <future>
#include <memory>
#include <string>
#include <string_view>
#include <thread>

#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "scope_exit.h"
#include "server/redis_reply.h"
Expand Down Expand Up @@ -557,7 +560,6 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
}

ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *bev) {
char *bulk_data = nullptr;
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
Expand All @@ -576,31 +578,38 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}
case Incr_batch_data:
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not enough
return CBState::AGAIN;
}

const char *bulk_data =
reinterpret_cast<const char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
evbuffer_drain(input, incr_bulk_len_ + 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this issue directly

I'm not familiar with evbuffer api, but would

std::string bulk_string(0, incr_bulk_len_);
evbuffer_remove(input, bulk_string.data(), incr_bulk_len_ + 2);

Being ok since it avoid adjust the input internal? (The bad things is that std::string would zero-initialize itself, which introducing a round of copying 😅) @git-hulk @PragmaTwice

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh let's not optimize this if no benchmark shows it's better...

incr_state_ = Incr_batch_size;

if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
if (bulk_string != "ping") {
auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, incr_bulk_len_));
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(bulk_string);
return CBState::RESTART;
}

s = parseWriteBatch(bulk_string);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(bulk_string)
<< ": " << s.Msg();
return CBState::RESTART;
}
}
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;
} else {
return CBState::AGAIN;
}

rocksdb::WriteBatch batch(std::move(bulk_string));

auto s = storage_->ReplicaApplyWriteBatch(&batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(batch.Data());
return CBState::RESTART;
}

s = parseWriteBatch(batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(batch.Data())
<< ": " << s.Msg();
return CBState::RESTART;
}

break;
}
}
Expand Down Expand Up @@ -981,8 +990,7 @@ void ReplicationThread::TimerCB(int, int16_t) {
}
}

Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch) {
WriteBatchHandler write_batch_handler;

auto db_status = write_batch.Iterate(&write_batch_handler);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "event_util.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -209,7 +210,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
static bool isWrongPsyncNum(std::string_view err);
static bool isUnknownOption(std::string_view err);

Status parseWriteBatch(const std::string &batch_string);
Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
};

/*
Expand Down
16 changes: 11 additions & 5 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "redis_db.h"
#include "redis_metadata.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "server/server.h"
#include "storage/batch_indexer.h"
Expand Down Expand Up @@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write
return Write(ctx, options, batch->GetWriteBatch());
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
return applyWriteBatch(default_write_opts_, batch);
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
auto s = db_->Write(options, &batch);
auto s = db_->Write(options, batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
return applyWriteBatch(options, &batch);
}

void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
Expand Down
6 changes: 4 additions & 2 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "config/config.h"
#include "lock_manager.h"
#include "observer_or_unique.h"
#include "rocksdb/write_batch.h"
#include "status.h"

#if defined(__sparc__) || defined(__arm__)
Expand Down Expand Up @@ -230,7 +231,7 @@ class Storage {
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

Expand Down Expand Up @@ -380,13 +381,14 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;

rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
rocksdb::WriteOptions default_write_opts_;

// rocksdb used global block cache
std::shared_ptr<rocksdb::Cache> shared_block_cache_;

rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
Status applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch);
};

/// Context passes fixed snapshot and batch between APIs
Expand Down
Loading