diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index 553793f7458e9..12496873420ad 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -55,6 +55,7 @@ static string mp_ns = RGW_OBJ_NS_MULTIPART; static struct m0_ufid_generator ufid_gr; namespace rgw::sal { +static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024; using ::ceph::encode; using ::ceph::decode; @@ -2664,7 +2665,11 @@ int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz) return rc; } expected_obj_size = sz; - + chunk_io_sz = expected_obj_size; + if (expected_obj_size > MAX_ACC_SIZE) { + // Cap it to MAX_ACC_SIZE + chunk_io_sz = MAX_ACC_SIZE; + } ldpp_dout(dpp, 20) <<__func__<< ": key=" << this->get_key().to_str() << " size=" << sz << " meta:oid=[0x" << std::hex << meta.oid.u_hi << ":0x" << meta.oid.u_lo << "]" << dendl; @@ -2869,6 +2874,7 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer struct m0_bufvec buf; struct m0_bufvec attr; struct m0_indexvec ext; + bool last_io = false; bufferlist data = std::move(in_buffer); @@ -2889,19 +2895,27 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer // We are in data accumulation mode available_data = io_ctxt.total_bufer_sz; } + bs = this->get_optimal_bs(chunk_io_sz); + if (bs < chunk_io_sz) { + chunk_io_sz = bs; + } + int64_t remaining_bytes = + expected_obj_size - processed_bytes; + // Check if this is the last io of the original object size + if (remaining_bytes <= 0) + last_io = true; - bs = this->get_optimal_bs(expected_obj_size); - ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; + ldpp_dout(dpp, 20) <<__func__<< ": Incoming data=" << left << " bs=" << bs << dendl; if ((left + available_data) < bs) { // Determine if there are any further chunks/bytes from socket to be processed - int64_t remaining_bytes = expected_obj_size - processed_bytes; if (remaining_bytes > 0) { if (io_ctxt.accumulated_buffer_list.size() == 0) { // Save offset io_ctxt.start_offset = offset; } // Append current buffer to the list of accumulated buffers - ldpp_dout(dpp, 20) <<__func__<< " More data (" << remaining_bytes << " bytes) in-flight. Accumulating buffer..." << dendl; + ldpp_dout(dpp, 20) <<__func__<< " More incoming data (" << remaining_bytes + << " bytes) in-flight. Accumulating buffer..." << dendl; io_ctxt.accumulated_buffer_list.push_back(std::move(data)); io_ctxt.total_bufer_sz += left; return 0; @@ -2932,7 +2946,6 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer goto out; } - ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; if (io_ctxt.accumulated_buffer_list.size() > 0) { // We have IO buffers accumulated. Transform it into single buffer. data.clear(); @@ -2942,20 +2955,22 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer offset = io_ctxt.start_offset; left = data.length(); bs = this->get_optimal_bs(left); - ldpp_dout(dpp, 20) <<__func__<< ": accumulated left=" << left << " bs=" << bs << dendl; + ldpp_dout(dpp, 20) <<__func__<< ": Accumulated data=" << left << " bs=" << bs << dendl; io_ctxt.accumulated_buffer_list.clear(); } else { // No accumulated buffers. + ldpp_dout(dpp, 20) <<__func__<< ": Data=" << left << " bs=" << bs << dendl; } start = data.c_str(); for (p = start; left > 0; left -= bs, p += bs, offset += bs) { - if (left < bs) { + if (left < bs && last_io) { bs = this->get_optimal_bs(left, true); flags |= M0_OOF_LAST; } - if (left < bs) { - ldpp_dout(dpp, 20) <<__func__<< " left ="<< left << ",bs=" << bs << ", Padding [" << (bs - left) << "] bytes" << dendl; + if (left < bs && last_io) { + ldpp_dout(dpp, 20) <<__func__<< " Data ="<< left << ", bs=" << bs << ", Padding [" << (bs - left) + << "] bytes to data" << dendl; data.append_zero(bs - left); p = data.c_str(); } @@ -2965,7 +2980,7 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer ext.iv_vec.v_count[0] = bs; attr.ov_vec.v_count[0] = 0; - ldpp_dout(dpp, 20) <<__func__<< ": write buffer bytes=[" << bs << "], at offset=[" << offset << "]" << dendl; + ldpp_dout(dpp, 20) <<__func__<< ": Write data bytes=[" << bs << "], at offset=[" << offset << "]" << dendl; op = nullptr; this->mobj->ob_entity.en_flags |= M0_ENF_GEN_DI; rc = m0_obj_op(this->mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, flags, &op); @@ -3644,8 +3659,6 @@ int MotrAtomicWriter::write(bool last) return rc; } -static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024; - // Accumulate enough data first to make a reasonable decision about the // optimal unit size for a new object, or bs for existing object (32M seems // enough for 4M units in 8+2 parity groups, a common config on wide pools), diff --git a/src/rgw/rgw_sal_motr.h b/src/rgw/rgw_sal_motr.h index bb53486b49dc5..befd13545bf45 100644 --- a/src/rgw/rgw_sal_motr.h +++ b/src/rgw/rgw_sal_motr.h @@ -562,6 +562,7 @@ class MotrObject : public Object { uint64_t part_num; // Object size as available from Content-Length header uint64_t expected_obj_size = 0; + uint64_t chunk_io_sz = 0; // Total Number of bytes processed so far uint64_t processed_bytes = 0; struct AccumulateIOCtxt io_ctxt = {};