Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

rgw_sal_motr: [CORTX-34075] Enhance multipart object write IO #408

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static string mp_ns = RGW_OBJ_NS_MULTIPART;
static struct m0_ufid_generator ufid_gr;

namespace rgw::sal {
static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024;

using ::ceph::encode;
using ::ceph::decode;
Expand Down Expand Up @@ -2664,7 +2665,11 @@ int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz)
return rc;
}
expected_obj_size = sz;

chunk_io_sz = expected_obj_size;
if (expected_obj_size > MAX_ACC_SIZE) {
// Cap it to MAX_ACC_SIZE
chunk_io_sz = MAX_ACC_SIZE;
}
ldpp_dout(dpp, 20) <<__func__<< ": key=" << this->get_key().to_str()
<< " size=" << sz << " meta:oid=[0x" << std::hex
<< meta.oid.u_hi << ":0x" << meta.oid.u_lo << "]" << dendl;
Expand Down Expand Up @@ -2869,6 +2874,7 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
struct m0_bufvec buf;
struct m0_bufvec attr;
struct m0_indexvec ext;
bool last_io = false;

bufferlist data = std::move(in_buffer);

Expand All @@ -2889,19 +2895,27 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
// We are in data accumulation mode
available_data = io_ctxt.total_bufer_sz;
}
bs = this->get_optimal_bs(chunk_io_sz);
if (bs < chunk_io_sz) {
chunk_io_sz = bs;
}
int64_t remaining_bytes =
expected_obj_size - processed_bytes;
// Check if this is the last io of the original object size
if (remaining_bytes <= 0)
last_io = true;

bs = this->get_optimal_bs(expected_obj_size);
ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl;
ldpp_dout(dpp, 20) <<__func__<< ": Incoming data=" << left << " bs=" << bs << dendl;
if ((left + available_data) < bs) {
// Determine if there are any further chunks/bytes from socket to be processed
int64_t remaining_bytes = expected_obj_size - processed_bytes;
if (remaining_bytes > 0) {
if (io_ctxt.accumulated_buffer_list.size() == 0) {
// Save offset
io_ctxt.start_offset = offset;
}
// Append current buffer to the list of accumulated buffers
ldpp_dout(dpp, 20) <<__func__<< " More data (" << remaining_bytes << " bytes) in-flight. Accumulating buffer..." << dendl;
ldpp_dout(dpp, 20) <<__func__<< " More incoming data (" << remaining_bytes
<< " bytes) in-flight. Accumulating buffer..." << dendl;
io_ctxt.accumulated_buffer_list.push_back(std::move(data));
io_ctxt.total_bufer_sz += left;
return 0;
Expand Down Expand Up @@ -2932,7 +2946,6 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
goto out;
}

ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl;
if (io_ctxt.accumulated_buffer_list.size() > 0) {
// We have IO buffers accumulated. Transform it into single buffer.
data.clear();
Expand All @@ -2942,20 +2955,22 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
offset = io_ctxt.start_offset;
left = data.length();
bs = this->get_optimal_bs(left);
ldpp_dout(dpp, 20) <<__func__<< ": accumulated left=" << left << " bs=" << bs << dendl;
ldpp_dout(dpp, 20) <<__func__<< ": Accumulated data=" << left << " bs=" << bs << dendl;
io_ctxt.accumulated_buffer_list.clear();
} else {
// No accumulated buffers.
ldpp_dout(dpp, 20) <<__func__<< ": Data=" << left << " bs=" << bs << dendl;
}

start = data.c_str();
for (p = start; left > 0; left -= bs, p += bs, offset += bs) {
if (left < bs) {
if (left < bs && last_io) {
bs = this->get_optimal_bs(left, true);
flags |= M0_OOF_LAST;
}
if (left < bs) {
ldpp_dout(dpp, 20) <<__func__<< " left ="<< left << ",bs=" << bs << ", Padding [" << (bs - left) << "] bytes" << dendl;
if (left < bs && last_io) {
ldpp_dout(dpp, 20) <<__func__<< " Data ="<< left << ", bs=" << bs << ", Padding [" << (bs - left)
<< "] bytes to data" << dendl;
data.append_zero(bs - left);
p = data.c_str();
}
Expand All @@ -2965,7 +2980,7 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
ext.iv_vec.v_count[0] = bs;
attr.ov_vec.v_count[0] = 0;

ldpp_dout(dpp, 20) <<__func__<< ": write buffer bytes=[" << bs << "], at offset=[" << offset << "]" << dendl;
ldpp_dout(dpp, 20) <<__func__<< ": Write data bytes=[" << bs << "], at offset=[" << offset << "]" << dendl;
op = nullptr;
this->mobj->ob_entity.en_flags |= M0_ENF_GEN_DI;
rc = m0_obj_op(this->mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, flags, &op);
Expand Down Expand Up @@ -3644,8 +3659,6 @@ int MotrAtomicWriter::write(bool last)
return rc;
}

static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024;

// Accumulate enough data first to make a reasonable decision about the
// optimal unit size for a new object, or bs for existing object (32M seems
// enough for 4M units in 8+2 parity groups, a common config on wide pools),
Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ class MotrObject : public Object {
uint64_t part_num;
// Object size as available from Content-Length header
uint64_t expected_obj_size = 0;
uint64_t chunk_io_sz = 0;
// Total Number of bytes processed so far
uint64_t processed_bytes = 0;
struct AccumulateIOCtxt io_ctxt = {};
Expand Down