Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and Improve fix pg size & shard identical layout #235

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.12"
version = "2.1.13"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
9 changes: 5 additions & 4 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
// Both chunk_num_t and pg_id_t are of type uint16_t.
static_assert(std::is_same< pg_id_t, uint16_t >::value, "pg_id_t is not uint16_t");
static_assert(std::is_same< homestore::chunk_num_t, uint16_t >::value, "chunk_num_t is not uint16_t");
uint32_t application_hint = hint.application_hint.value();
auto application_hint = hint.application_hint.value();
pg_id_t pg_id = (uint16_t)(application_hint >> 16 & 0xFFFF);
homestore::chunk_num_t v_chunk_id = (uint16_t)(application_hint & 0xFFFF);
return select_specific_chunk(pg_id, v_chunk_id);
Expand Down Expand Up @@ -129,12 +129,13 @@ std::optional< uint32_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id,
LOGWARNMOD(homeobject, "PG had already created, pg_id {}", pg_id);
return std::nullopt;
}
if (pg_size == 0) {
LOGWARNMOD(homeobject, "Not supported to create empty PG, pg_id {}, pg_size {}", pg_id, pg_size);

const auto chunk_size = get_chunk_size();
if (pg_size < chunk_size) {
LOGWARNMOD(homeobject, "pg_size {} is less than chunk_size {}", pg_size, chunk_size);
return std::nullopt;
}

const auto chunk_size = get_chunk_size();
const uint32_t num_chunk = sisl::round_down(pg_size, chunk_size) / chunk_size;

// Select a pdev with the most available num chunk
Expand Down
8 changes: 6 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class HSReplApplication : public homestore::ReplApplication {
//
// This should assert if we can not initialize HomeStore.
//
uint64_t HSHomeObject::_hs_chunk_size = HS_CHUNK_SIZE;

DevType HSHomeObject::get_device_type(string const& devname) {
const iomgr::drive_type dtype = iomgr::DriveInterface::get_drive_type(devname);
if (dtype == iomgr::drive_type::block_hdd || dtype == iomgr::drive_type::file_on_hdd) { return DevType::HDD; }
Expand Down Expand Up @@ -168,7 +170,8 @@ void HSHomeObject::init_homestore() {
{HS_SERVICE::REPLICATION,
hs_format_params{.dev_type = HSDevType::Data,
.size_pct = 99.0,
.num_chunks = 60000,
.num_chunks = 0,
.chunk_size = _hs_chunk_size,
.block_size = _data_block_size,
.alloc_type = blk_allocator_type_t::append,
.chunk_sel_type = chunk_selector_type_t::CUSTOM}},
Expand All @@ -185,7 +188,8 @@ void HSHomeObject::init_homestore() {
{HS_SERVICE::REPLICATION,
hs_format_params{.dev_type = run_on_type,
.size_pct = 79.0,
.num_chunks = 60000,
.num_chunks = 0,
.chunk_size = _hs_chunk_size,
.block_size = _data_block_size,
.alloc_type = blk_allocator_type_t::append,
.chunk_sel_type = chunk_selector_type_t::CUSTOM}},
Expand Down
5 changes: 4 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ BlobError toBlobError(homestore::ReplServiceError const&);
ShardError toShardError(homestore::ReplServiceError const&);

class HSHomeObject : public HomeObjectImpl {
private:
/// NOTE: Be wary to change these as they effect on-disk format!
inline static auto const _svc_meta_name = std::string("HomeObject");
inline static auto const _pg_meta_name = std::string("PGManager");
inline static auto const _shard_meta_name = std::string("ShardManager");
static constexpr uint64_t HS_CHUNK_SIZE = 2 * Gi;
static constexpr uint32_t _data_block_size = 1024;
static uint64_t _hs_chunk_size;
///

/// Overridable Helpers
Expand Down Expand Up @@ -244,7 +247,7 @@ class HSHomeObject : public HomeObjectImpl {

struct HS_Shard : public Shard {
homestore::superblk< shard_info_superblk > sb_;
HS_Shard(ShardInfo info, homestore::chunk_num_t p_chunk_id);
HS_Shard(ShardInfo info, homestore::chunk_num_t p_chunk_id, homestore::chunk_num_t v_chunk_id);
HS_Shard(homestore::superblk< shard_info_superblk >&& sb);
~HS_Shard() override = default;

Expand Down
11 changes: 6 additions & 5 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set<
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();

if (pg_info.size == 0) {
LOGW("Not supported to create empty PG, pg_id {}, pg_size {}", pg_id, pg_info.size);
const auto most_avail_num_chunks = chunk_selector()->most_avail_num_chunks();
const auto chunk_size = chunk_selector()->get_chunk_size();
if (pg_info.size < chunk_size) {
LOGW("Not support to create PG which pg_size {} < chunk_size {}", pg_info.size, chunk_size);
return folly::makeUnexpected(PGError::INVALID_ARG);
}

const auto most_avail_num_chunks = chunk_selector()->most_avail_num_chunks();
const auto chunk_size = chunk_selector()->get_chunk_size();
const auto needed_num_chunks = sisl::round_down(pg_info.size, chunk_size) / chunk_size;
if (needed_num_chunks > most_avail_num_chunks) {
LOGW("No enough space to create pg, pg_id {}, needed_num_chunks {}, most_avail_num_chunks {}", pg_id,
Expand Down Expand Up @@ -173,7 +173,8 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
LOGI("create pg {} successfully, index table uuid={} pg_size={} num_chunk={}", pg_id, uuid_str, pg_info.size,
num_chunk.value());
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
Expand Down
10 changes: 7 additions & 3 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader));
auto shard_info = sb->info;
auto v_chunk_id = sb->v_chunk_id;
shard_info.lsn = lsn;

bool shard_exist = false;
Expand All @@ -351,11 +352,12 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end());
}
if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num()));
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num(), v_chunk_id));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
auto pg_id = shard_info.placement_group;
chunk_selector_->select_specific_chunk(pg_id, blkids.chunk_num());
auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id);
RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id: {} in PG: {}", v_chunk_id, pg_id);
}
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

Expand Down Expand Up @@ -524,11 +526,13 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const&
}
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id) :
HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id,
homestore::chunk_num_t v_chunk_id) :
Shard(std::move(shard_info)), sb_(_shard_meta_name) {
sb_.create(sizeof(shard_info_superblk));
sb_->info = info;
sb_->p_chunk_id = p_chunk_id;
sb_->v_chunk_id = v_chunk_id;
sb_.write();
}

Expand Down
5 changes: 4 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t

case ReplicationMessageType::SEAL_SHARD_MSG: {
auto p_chunkID = home_object_->get_shard_p_chunk_id(msg_header->shard_id);
RELEASE_ASSERT(p_chunkID.has_value(), "unknown shard id to get binded chunk");
if (!p_chunkID.has_value()) {
LOGW("shard does not exist, underlying engine will retry this later", msg_header->shard_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}
homestore::blk_alloc_hints hints;
hints.chunk_id_hint = p_chunkID.value();
return hints;
Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class HomeObjectFixture : public ::testing::Test {
HomeObjectFixture() : rand_blob_size{1u, 16 * 1024}, rand_user_key_size{1u, 1024} {}

void SetUp() override {
HSHomeObject::_hs_chunk_size = 20 * Mi;
_obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject());
g_helper->sync();
}
Expand Down Expand Up @@ -126,6 +127,19 @@ class HomeObjectFixture : public ::testing::Test {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

// set v_chunk_id to IPC
run_on_pg_leader(pg_id, [&]() {
auto v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(v_chunkID.has_value(), "failed to get shard v_chunk_id");
g_helper->set_v_chunk_id(v_chunkID.value());
});

// get v_chunk_id from IPC and compare with local
auto leader_v_chunk_id = g_helper->get_v_chunk_id();
auto local_v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(local_v_chunkID.has_value(), "failed to get shard v_chunk_id");
RELEASE_ASSERT(leader_v_chunk_id == local_v_chunkID, "v_chunk_id supposed to be identical");

auto r = _obj_inst->shard_manager()->get_shard(shard_id).get();
RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id);
return r.value();
Expand Down
39 changes: 39 additions & 0 deletions src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,45 @@ TEST_F(HomeObjectFixture, PGExceedSpaceTest) {
}
}

TEST_F(HomeObjectFixture, PGSizeLessThanChunkTest) {
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());
g_helper->sync();
pg_id_t pg_id{1};
if (0 == g_helper->replica_num()) { // leader
auto memebers = g_helper->members();
auto name = g_helper->name();
auto info = homeobject::PGInfo(pg_id);
info.size = 1; // less than chunk size
for (const auto& member : memebers) {
if (0 == member.second) {
// by default, leader is the first member
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 1});
} else {
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 0});
}
}
auto p = _obj_inst->pg_manager()->create_pg(std::move(info)).get();
ASSERT_TRUE(p.hasError());
PGError error = p.error();
ASSERT_EQ(PGError::INVALID_ARG, error);
} else {
auto start_time = std::chrono::steady_clock::now();
bool res = true;
// follower need to wait for pg creation
while (!pg_exist(pg_id)) {
auto current_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast< std::chrono::seconds >(current_time - start_time).count();
if (duration >= 20) {
LOGINFO("Failed to create pg {} at follower", pg_id);
res = false;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
ASSERT_FALSE(res);
}
}

TEST_F(HomeObjectFixture, PGRecoveryTest) {
// create 10 pg
for (pg_id_t i = 1; i < 11; i++) {
Expand Down
17 changes: 17 additions & 0 deletions src/lib/homestore_backend/tests/hs_repl_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace bip = boost::interprocess;
using namespace homeobject;

#define INVALID_UINT64_ID UINT64_MAX
#define INVALID_CHUNK_NUM UINT16_MAX

namespace test_common {

Expand All @@ -56,6 +57,7 @@ class HSReplTestHelper {
sync_point_num_ = sync_point;
homeobject_replica_count_ = 0;
uint64_id_ = INVALID_UINT64_ID;
v_chunk_id_ = INVALID_CHUNK_NUM;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I prefer to cast homestore::chunk_num_t(uinit_16) to uint_64 here, and then cast it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge first, and if necessary, I will refine in next pr.
BTW, what are the pros in this way? 😃

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hs_repl_test_helper.hpp is the framework, it does not know what is a shard_id_t , pg_id_t , blob_id_t and chunk_id_t. these types are only know application layer.

if we add v_chunk_id here, should we also add shard_id_t, pg_id_t and others?

so I think we can use this 64bit as a common shared data across different processed, and application layer should translate this to what it wants

cv_.notify_all();
} else {
cv_.wait(lg, [this, sync_point]() { return sync_point_num_ == sync_point; });
Expand All @@ -72,6 +74,16 @@ class HSReplTestHelper {
return uint64_id_;
}

void set_v_chunk_id(homestore::chunk_num_t input_v_chunk_id) {
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
v_chunk_id_ = input_v_chunk_id;
}

homestore::chunk_num_t get_v_chunk_id() {
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
return v_chunk_id_;
}

private:
bip::interprocess_mutex mtx_;
bip::interprocess_condition cv_;
Expand All @@ -80,6 +92,9 @@ class HSReplTestHelper {
// the following variables are used to share shard_id and blob_id among different replicas
uint64_t uint64_id_{0};

// used to verify identical layout
homestore::chunk_num_t v_chunk_id_{0};

// the nth synchronization point, that is how many times different replicas have synced
uint64_t sync_point_num_{UINT64_MAX};
};
Expand Down Expand Up @@ -256,6 +271,8 @@ class HSReplTestHelper {
void sync() { ipc_data_->sync(sync_point_num++, total_replicas_nums_); }
void set_uint64_id(uint64_t uint64_id) { ipc_data_->set_uint64_id(uint64_id); }
uint64_t get_uint64_id() { return ipc_data_->get_uint64_id(); }
void set_v_chunk_id(homestore::chunk_num_t v_chunk_id) { ipc_data_->set_v_chunk_id(v_chunk_id); }
homestore::chunk_num_t get_v_chunk_id() { return ipc_data_->get_v_chunk_id(); }

void check_and_kill(int port) {
std::string command = "lsof -t -i:" + std::to_string(port);
Expand Down
71 changes: 61 additions & 10 deletions src/lib/homestore_backend/tests/hs_shard_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,30 @@ TEST_F(HomeObjectFixture, CreateMultiShardsOnMultiPG) {

for (const auto pg : pgs) {
auto shard_info = create_shard(pg, Mi);
auto chunk_num_1 = _obj_inst->get_shard_p_chunk_id(shard_info.id);
ASSERT_TRUE(chunk_num_1.has_value());
auto p_chunk_ID1 = _obj_inst->get_shard_p_chunk_id(shard_info.id);
auto v_chunk_ID1 = _obj_inst->get_shard_v_chunk_id(shard_info.id);
ASSERT_TRUE(p_chunk_ID1.has_value());
ASSERT_TRUE(v_chunk_ID1.has_value());

// create another shard again.
shard_info = create_shard(pg, Mi);
auto chunk_num_2 = _obj_inst->get_shard_p_chunk_id(shard_info.id);
ASSERT_TRUE(chunk_num_2.has_value());
auto p_chunk_ID2 = _obj_inst->get_shard_p_chunk_id(shard_info.id);
auto v_chunk_ID2 = _obj_inst->get_shard_v_chunk_id(shard_info.id);
ASSERT_TRUE(p_chunk_ID2.has_value());
ASSERT_TRUE(v_chunk_ID2.has_value());

// v_chunk_id should not same
ASSERT_NE(v_chunk_ID1.value(), v_chunk_ID2.value());
// check if both chunk is on the same pg and pdev;
auto chunks = _obj_inst->chunk_selector()->m_chunks;
ASSERT_TRUE(chunks.find(chunk_num_1.value()) != chunks.end());
ASSERT_TRUE(chunks.find(chunk_num_2.value()) != chunks.end());
auto chunk_1 = chunks[chunk_num_1.value()];
auto chunk_2 = chunks[chunk_num_2.value()];
ASSERT_TRUE(chunks.find(p_chunk_ID1.value()) != chunks.end());
ASSERT_TRUE(chunks.find(p_chunk_ID2.value()) != chunks.end());
auto chunk_1 = chunks[p_chunk_ID1.value()];
auto chunk_2 = chunks[p_chunk_ID2.value()];
ASSERT_TRUE(chunk_1->m_pg_id.has_value());
ASSERT_TRUE(chunk_2->m_pg_id.has_value());
ASSERT_TRUE(chunk_1->m_pg_id.value() == chunk_2->m_pg_id.value());
ASSERT_TRUE(chunk_1->get_pdev_id() == chunk_2->get_pdev_id());
ASSERT_EQ(chunk_1->m_pg_id.value(), chunk_2->m_pg_id.value());
ASSERT_EQ(chunk_1->get_pdev_id(), chunk_2->get_pdev_id());
}
}

Expand All @@ -64,6 +70,51 @@ TEST_F(HomeObjectFixture, SealShard) {
// seal the shard
shard_info = seal_shard(shard_info.id);
ASSERT_EQ(ShardInfo::State::SEALED, shard_info.state);

// seal the shard again
shard_info = seal_shard(shard_info.id);
ASSERT_EQ(ShardInfo::State::SEALED, shard_info.state);

// create shard until no space left, we have 5 chunks in one pg.
for (auto i = 0; i < 5; i++) {
shard_info = create_shard(pg_id, 64 * Mi);
ASSERT_EQ(ShardInfo::State::OPEN, shard_info.state);
}

{
g_helper->sync();

// expect to create shard failed
run_on_pg_leader(pg_id, [&]() {
auto s = _obj_inst->shard_manager()->create_shard(pg_id, 64 * Mi).get();
ASSERT_TRUE(s.hasError());
ASSERT_EQ(ShardError::NO_SPACE_LEFT, s.error());
});

auto start_time = std::chrono::steady_clock::now();
bool res = true;

while (g_helper->get_uint64_id() == INVALID_UINT64_ID) {
auto current_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast< std::chrono::seconds >(current_time - start_time).count();
if (duration >= 20) {
LOGINFO("Failed to create shard at pg {}", pg_id);
res = false;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
ASSERT_FALSE(res);
}

shard_info = seal_shard(shard_info.id);
ASSERT_EQ(ShardInfo::State::SEALED, shard_info.state);

shard_info = create_shard(pg_id, 64 * Mi);
ASSERT_EQ(ShardInfo::State::OPEN, shard_info.state);

shard_info = seal_shard(shard_info.id);
ASSERT_EQ(ShardInfo::State::SEALED, shard_info.state);
}

TEST_F(HomeObjectFixture, ShardManagerRecovery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class HeapChunkSelectorTest : public ::testing::Test {
const uint32_t chunk_size = HCS.get_chunk_size();
const u_int64_t pg_size = chunk_size * 3;
for (uint16_t pg_id = 1; pg_id < 4; ++pg_id) {
// not supported to create empty pg
// not supported to create pg which pg_size < chunk_size
ASSERT_FALSE(HCS.select_chunks_for_pg(pg_id, 0).has_value());
ASSERT_EQ(HCS.select_chunks_for_pg(pg_id, pg_size).value(), 3);
uint32_t last_pdev_id = 0;
Expand Down
Loading