Skip to content

Commit

Permalink
fix: corruption in replication stream (#3344)
Browse files Browse the repository at this point in the history
Before it was possible to issue several concurrent AsyncWrite requests.
But these are not atomic, which leads to replication stream corruption.
Now we wait for the previous request to finish before sending the next one.

ThrottleIfNeeded is now takes into account pending buffer size for throttling.

Fixes #3329

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Jul 20, 2024
1 parent fb7782b commit 7b2603a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 37 deletions.
70 changes: 34 additions & 36 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ iovec IoVec(io::Bytes src) {
return iovec{const_cast<uint8_t*>(src.data()), src.size()};
}

constexpr size_t kFlushThreshold = 2_KB;
uint32_t replication_stream_output_limit_cached = 64_KB;

} // namespace
Expand Down Expand Up @@ -90,44 +89,42 @@ void JournalStreamer::Write(std::string_view str) {
DCHECK(!str.empty());
DVLOG(2) << "Writing " << str.size() << " bytes";

// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
size_t total_pending = pending_buf_.size() + str.size();
if (in_flight_bytes_ == 0 || total_pending > kFlushThreshold) {
// because of potential SOO with strings we allocate explicitly on heap
uint8_t* buf(new uint8_t[str.size()]);

// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
return;
}

iovec v[2];
unsigned next_buf_id = 0;
// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
// because of potential SOO with strings, we allocate explicitly on heap.
uint8_t* buf(new uint8_t[str.size()]);

if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));
// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
total_sent_ += total_pending;

dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
iovec v[2];
unsigned next_buf_id = 0;

return;
if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));

DCHECK_GT(in_flight_bytes_, 0u);
DCHECK_LE(pending_buf_.size() + str.size(), kFlushThreshold);

// Aggregate
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
}

void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
Expand Down Expand Up @@ -160,13 +157,14 @@ void JournalStreamer::ThrottleIfNeeded() {

auto next = chrono::steady_clock::now() +
chrono::milliseconds(absl::GetFlag(FLAGS_replication_stream_timeout));
auto inflight_start = in_flight_bytes_;
size_t inflight_start = in_flight_bytes_;
size_t sent_start = total_sent_;

std::cv_status status =
waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next);
if (status == std::cv_status::timeout) {
LOG(WARNING) << "Stream timed out, inflight bytes start: " << inflight_start
<< ", end: " << in_flight_bytes_;
LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/"
<< sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_;
cntx_->ReportError(make_error_code(errc::stream_timeout));
}
}
Expand All @@ -182,7 +180,7 @@ void JournalStreamer::WaitForInflightToComplete() {
}

bool JournalStreamer::IsStalled() const {
return in_flight_bytes_ >= replication_stream_output_limit_cached;
return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached;
}

RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class JournalStreamer {

journal::Journal* journal_;
std::vector<uint8_t> pending_buf_;
size_t in_flight_bytes_ = 0;
size_t in_flight_bytes_ = 0, total_sent_ = 0;

time_t last_lsn_time_ = 0;
util::fb2::EventCount waker_;
uint32_t journal_cb_id_{0};
Expand Down

0 comments on commit 7b2603a

Please sign in to comment.