Skip to content

Commit

Permalink
locally choose predicates to flip. Root only determines the number of…
Browse files Browse the repository at this point in the history
… flips per rank.
  • Loading branch information
JaeseungYeom committed Sep 5, 2024
1 parent 7e6584d commit 7eacdb6
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 79 deletions.
240 changes: 163 additions & 77 deletions src/AMSlib/wf/validator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <iostream>
#include <string>
#include <cstring> // memcpy
#include <set>
#include <vector>
#include <array>
#include <algorithm>
Expand Down Expand Up @@ -146,8 +147,11 @@ class VPCollector {
int gather_predicate(const bool* pred_loc, size_t num_pred_loc);
size_t pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const;
std::vector<size_t> pick_val_pts(unsigned k = 0u);
void turn_predicate_on(const std::vector<size_t>& val_pts);
bool* distribute_predicate(const AMSResourceType appDataLoc);
bool count_local_vals(const std::vector<size_t>& val_pts,
std::vector<unsigned>& num_chosen_per_rank);
bool turn_predicate_on(const unsigned my_val_pt_cnt);
bool* distribute_predicate(const std::vector<unsigned>& local_val_pt_cnts,
const AMSResourceType appDataLoc);
/// Clear intermediate data for root rank to handle predicate update
void clear_intermediate_info();
std::array<FPTypeValue, 2> sum_sqdev(const FPTypeValue* phy_out,
Expand All @@ -163,31 +167,39 @@ class VPCollector {
int m_rank;
int m_num_ranks;
size_t m_num_pred_loc;
/// Local predicate array
const bool* m_pred_loc;
/// indices of positive local predicates
std::vector<size_t> m_pred_loc_pos;
bool* m_pred_loc_new;
AMSResourceType m_appDataLoc;
std::vector<FPTypeValue*> m_sur; ///< surrogate output backup

// For root rank
std::vector<uint8_t> m_predicate_all;
std::vector<int> m_num_pred_all;
std::vector<int> m_rcnts;
std::vector<int> m_displs;
/// The total number of predicates
size_t m_tot_num_preds;
/// The total number of positive predicates
size_t m_tot_num_preds_pos;
/// number of predicates and the positive ones of each rank
std::vector<unsigned> m_num_pred_all;
/// displacement of positive predicates across ranks
std::vector<size_t> m_displs_pos;
std::default_random_engine m_rng;
};

template <typename FPTypeValue>
void VPCollector<FPTypeValue>::clear_intermediate_info()
{
m_predicate_all.clear();
m_tot_num_preds = 0u;
m_tot_num_preds_pos = 0u;
m_num_pred_all.clear();
m_rcnts.clear();
m_displs.clear();
m_displs_pos.clear();
}

template <typename FPTypeValue>
VPCollector<FPTypeValue>::VPCollector(unsigned seed, MPI_Comm comm)
: m_comm(comm), m_num_pred_loc(0ul), m_pred_loc(nullptr), m_pred_loc_new(nullptr)
: m_comm(comm), m_num_pred_loc(0u), m_pred_loc(nullptr), m_pred_loc_new(nullptr),
m_tot_num_preds(0u), m_tot_num_preds_pos(0u)
{
MPI_Comm_rank(comm, &m_rank);
MPI_Comm_size(comm, &m_num_ranks);
Expand Down Expand Up @@ -225,19 +237,29 @@ int VPCollector<FPTypeValue>::gather_predicate(const bool* pred_loc, size_t num_
{
m_pred_loc = pred_loc;
m_num_pred_loc = num_pred_loc;
int cnt_loc = static_cast<int>(num_pred_loc);
int cnt_all = 0;
int rc = 0;
int rc = MPI_SUCCESS;

m_pred_loc_pos.clear();
m_pred_loc_pos.reserve(m_num_pred_loc);

for (size_t i = 0u; i < m_num_pred_loc; ++i) {
if (m_pred_loc[i]) m_pred_loc_pos.push_back(i);
}

std::vector<unsigned> cnt_loc = {static_cast<unsigned>(m_num_pred_loc),
static_cast<unsigned>(m_pred_loc_pos.size())};

m_num_pred_all.clear();
m_num_pred_all.resize(m_num_ranks);
if (m_rank == 0) {
m_num_pred_all.resize(m_num_ranks*2);
}
// Gather the data sizes (i.e., the number of items) from each rank
rc = MPI_Gather(reinterpret_cast<const void*>(&cnt_loc),
1,
MPI_INT,
rc = MPI_Gather(reinterpret_cast<const void*>(cnt_loc.data()),
2,
MPI_UNSIGNED,
reinterpret_cast<void*>(m_num_pred_all.data()),
1,
MPI_INT,
2,
MPI_UNSIGNED,
0,
m_comm);

Expand All @@ -249,39 +271,24 @@ int VPCollector<FPTypeValue>::gather_predicate(const bool* pred_loc, size_t num_
return rc;
}

m_displs.clear();
m_rcnts.clear();
m_displs_pos.clear();

if (m_rank == 0) {
m_displs.resize(m_num_ranks);
m_rcnts.resize(m_num_ranks);
m_displs_pos.resize(m_num_ranks);

int offset = 0;
size_t offset = 0u;
size_t offset_pos = 0u;
auto it = m_num_pred_all.cbegin();
for (int i = 0; i < m_num_ranks; ++i) {
m_displs[i] = offset;
offset += (m_rcnts[i] = m_num_pred_all[i]);
}
m_predicate_all.resize(cnt_all = offset);
}

rc = MPI_Gatherv(reinterpret_cast<const void*>(pred_loc),
cnt_loc,
MPI_C_BOOL,
reinterpret_cast<void*>(m_predicate_all.data()),
m_rcnts.data(),
m_displs.data(),
MPI_UINT8_T,
0,
m_comm);

if (rc != MPI_SUCCESS) {
if (m_rank == 0) {
std::cerr << "MPI_Gatherv() in gather_predicate() failed with code ("
<< rc << ")" << std::endl;
m_displs_pos[i] = offset_pos;
offset += static_cast<size_t>(*it++);
offset_pos += static_cast<size_t>(*it++);
}
m_tot_num_preds = offset;
m_tot_num_preds_pos = offset_pos;
}

return rc;
return MPI_SUCCESS;
}

/// Determine the number of points to evaluate physics model on while leveraging workers idle due to load imbalance
Expand All @@ -295,34 +302,97 @@ size_t VPCollector<FPTypeValue>::pick_num_val_pts(const size_t n_T, const size_t
return n_val;
}

/// Randonly choose the points to run physics on out of those accepted with the surrogate
/** Randonly choose the points to run physics on, out of those accepted with
the surrogate.
`k_v': the minimum number of new physics evaluation points per rank.
By default, it is 0. Each rank will have k or k+1 extra physic
evaluation points. */

template <typename FPTypeValue>
std::vector<size_t> VPCollector<FPTypeValue>::pick_val_pts(unsigned k)
std::vector<size_t> VPCollector<FPTypeValue>::pick_val_pts(unsigned k_v)
{
std::vector<size_t> accepted; // positions of accepted surrogate values
accepted.reserve(m_predicate_all.size());
for (size_t i = 0ul; i < m_predicate_all.size(); ++i) {
if (m_predicate_all[i]) {
accepted.push_back(i);
}
if (m_tot_num_preds_pos < 1u) {
return std::vector<size_t>{};
}
const size_t num_val = pick_num_val_pts(accepted.size(), m_predicate_all.size()-accepted.size(), k);
std::shuffle(accepted.begin(), accepted.end(), m_rng);
std::sort(accepted.begin(), accepted.begin() + num_val);

return std::vector<size_t>(accepted.cbegin(), accepted.cbegin() + num_val);
const size_t num_vals = pick_num_val_pts(m_tot_num_preds_pos,
m_tot_num_preds - m_tot_num_preds_pos,
k_v);

std::uniform_int_distribution<size_t> dist(0u, m_tot_num_preds_pos-1);

std::set<size_t> chosen;
while (chosen.size() < num_vals) {
chosen.insert(dist(m_rng));
}

return std::vector<size_t>(chosen.cbegin(), chosen.cend());
}

/** We could have translated the indices of chosen predicates to the local
* indices on owner ranks and then send the indices back to each rank.
* However, as each rank can own a various number of predicates chosen to
* flip, it would require sending the number of indices to expect to
* receive in advance. To save MPI communication, we simply send the count
* to each rank and let each rank locally determine which predicates to
* flip as many as the count it receives. */
template <typename FPTypeValue>
void VPCollector<FPTypeValue>::turn_predicate_on(const std::vector<size_t>& val_pts)
bool VPCollector<FPTypeValue>::count_local_vals(const std::vector<size_t>& val_pts,
std::vector<unsigned>& num_chosen_per_rank)
{
for (const auto i: val_pts) {
m_predicate_all[i] = static_cast<uint8_t>(0);
num_chosen_per_rank.assign(m_num_ranks, 0u);
int cur_rank = 0;

for (auto idx: val_pts) {
while (idx >= m_displs_pos[cur_rank+1]) {
cur_rank ++;
if (cur_rank >= m_num_ranks) {
std::cerr << "invalid predicate index!" << std::endl;
return false;
}
}
num_chosen_per_rank[cur_rank]++;
}
return true;
}

template <typename FPTypeValue>
bool VPCollector<FPTypeValue>::turn_predicate_on(const unsigned my_val_pt_cnt)
{
if (my_val_pt_cnt > m_pred_loc_pos.size()) {
return false;
}
if (my_val_pt_cnt == 0u || m_pred_loc_pos.size() == 0u) {
return true;
}

auto pred_idx = m_pred_loc_pos;

std::shuffle(pred_idx.begin(), pred_idx.end(), m_rng);

std::memcpy(m_pred_loc_new, m_pred_loc, m_num_pred_loc*sizeof(bool));

unsigned pos_cnt = 0u;
unsigned n_v = 0u;
size_t v_idx = pred_idx[0];

for (size_t i = 0u; i < m_num_pred_loc; ++i) {
pos_cnt += (m_pred_loc_new[i] == true);
if (pos_cnt == v_idx + 1) {
m_pred_loc_new[i] = false;
if (++n_v == my_val_pt_cnt) {
break;
}
v_idx = pred_idx[n_v];
}
}

return true;
}

template <typename FPTypeValue>
bool* VPCollector<FPTypeValue>::distribute_predicate(
const std::vector<unsigned>& local_val_pt_cnts,
const AMSResourceType appDataLoc)
{
int rc = 0;
Expand All @@ -342,14 +412,14 @@ bool* VPCollector<FPTypeValue>::distribute_predicate(
return nullptr;
}

rc = MPI_Scatterv(
reinterpret_cast<const void*>(m_predicate_all.data()),
reinterpret_cast<const int*>(m_rcnts.data()),
reinterpret_cast<const int*>(m_displs.data()),
MPI_UINT8_T,
reinterpret_cast<void*>(m_pred_loc_new),
static_cast<int>(m_num_pred_loc),
MPI_C_BOOL,
unsigned my_val_pt_cnt = 0u;
rc = MPI_Scatter(
reinterpret_cast<const void*>(local_val_pt_cnts.data()),
1,
MPI_UNSIGNED,
reinterpret_cast<void*>(&my_val_pt_cnt),
1,
MPI_UNSIGNED,
0, m_comm);

if (rc != MPI_SUCCESS) {
Expand All @@ -364,8 +434,22 @@ bool* VPCollector<FPTypeValue>::distribute_predicate(
auto &rm_d = ams::ResourceManager::getInstance();
rm_d.deallocate(m_pred_loc_new, appDataLoc);
#endif // __STANDALONE_TEST__
m_pred_loc_new = nullptr;

return nullptr;
}
clear_intermediate_info();
// Unset the predicate for those selected as validation points
if (!turn_predicate_on(my_val_pt_cnt)) {
std::cerr << "Failed to turn predicates on for extra validation!" << std::endl;
#if __STANDALONE_TEST__
ams::ResourceManager::deallocate(m_pred_loc_new, appDataLoc);
#else
auto &rm_d = ams::ResourceManager::getInstance();
rm_d.deallocate(m_pred_loc_new, appDataLoc);
#endif // __STANDALONE_TEST__
m_pred_loc_new = nullptr;
}

return m_pred_loc_new;
}
Expand All @@ -383,24 +467,26 @@ bool VPCollector<FPTypeValue>::set_validation(
return false;
}

std::vector<unsigned> local_val_pt_cnts;
if (m_rank == 0) {
std::vector<size_t> val_pts_all = pick_val_pts(k_v);
// indices of predicates to flip for enabling validation
std::vector<size_t> pred_idx = pick_val_pts(k_v);
#if 1
std::cout << "validation points: size(" << val_pts_all.size() << ")";
for (size_t i = 0ul; i < val_pts_all.size(); i++) {
std::cout << ' ' << val_pts_all[i];
std::cout << "validation points: size(" << pred_idx.size() << ")";
for (size_t i = 0ul; i < pred_idx.size(); i++) {
std::cout << ' ' << pred_idx[i];
}
std::cout << std::endl;
#endif
// Unset the predicate for those selected as validation points
turn_predicate_on(val_pts_all);
if (!count_local_vals(pred_idx, local_val_pt_cnts)) {
return false;
}
}

// distribute updated predicate
if (!distribute_predicate(appDataLoc)) {
if (!distribute_predicate(local_val_pt_cnts, appDataLoc)) {
return false;
}
clear_intermediate_info();

return true;
}
Expand Down Expand Up @@ -614,7 +700,7 @@ std::vector<ValStats<FPTypeValue>> VPCollector<FPTypeValue>::get_error_stats(
err_avg_glo[j][0] = err_sum_glo[j][0] / static_cast<FPTypeValue>(err_cnt_glo[0]);
err_avg_glo[j][1] = err_sum_glo[j][1] / static_cast<FPTypeValue>(err_cnt_glo[1]);
std::array<FPTypeValue, 2> err_var_loc =
sum_sqdev(phy_out[j], step_valpoints[j], err_avg_glo[j], j);
sum_sqdev(phy_out[j], m_sur[j], step_valpoints[j], err_avg_glo[j]);
//err_var_loc[0] /= static_cast<FPTypeValue>(err_cnt_glo[0]);
//err_var_loc[1] /= static_cast<FPTypeValue>(err_cnt_glo[1]);
MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, mpi_dtype, MPI_SUM, m_comm);
Expand Down
4 changes: 2 additions & 2 deletions src/AMSlib/wf/workflow.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ class AMSWorkflow
auto& valstep = valsteps.back();

set_exec_policy(AMSExecPolicy::AMS_BALANCED);
VPCollector<FPTypeValue> vpcol(val_seed, comm);
if (should_load_balance()) {
VPCollector<FPTypeValue> vpcol(val_seed, comm);
vpcol.set_validation(predicate, totalElements, appDataLoc, min_val_pts);

// Backup the surrogate outputs for those selected to compute physics
Expand Down Expand Up @@ -482,7 +482,7 @@ class AMSWorkflow
// Compute the error statistics between surrogate and physics model outpus
//------------------------------------------------------------------
if (should_load_balance()) {
auto stats = get_error_stats(origOutputs, valstep, comm);
auto stats = vpcol.get_error_stats(origOutputs, valstep);
for (int i = 0; i < outputDim; i++) {
CINFO(ErrorStats,
rId == 0,
Expand Down

0 comments on commit 7eacdb6

Please sign in to comment.