Skip to content

Commit

Permalink
Merge pull request #5755 from BOINC/dpa_max_concurrent2
Browse files Browse the repository at this point in the history
address issue with max concurrent and work fetch
  • Loading branch information
AenBleidd authored Aug 13, 2024
2 parents 39559b5 + fabdd13 commit af6c23c
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 31 deletions.
2 changes: 1 addition & 1 deletion client/client_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ int CLIENT_STATE::reset_project(PROJECT* project, bool detaching) {
project->min_rpc_time = 0;
project->pwf.reset(project);
for (int j=0; j<coprocs.n_rsc; j++) {
project->rsc_pwf[j].reset();
project->rsc_pwf[j].reset(j);
}
write_state_file();
return 0;
Expand Down
3 changes: 3 additions & 0 deletions client/makefile_sim
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# makefile for client simulator
# Do "make_clean" in client/, lib/, and sched/ first
#
# this doesn't have .h dependencies; if you change something,
# do make clean and make

CXXFLAGS = -g -DSIM -Wall \
-I ../lib \
Expand Down
4 changes: 2 additions & 2 deletions client/project.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ struct PROJECT : PROJ_AM {
//
RSC_PROJECT_WORK_FETCH rsc_pwf[MAX_RSC];
PROJECT_WORK_FETCH pwf;
inline void reset() {
inline void work_fetch_reset() {
for (int i=0; i<coprocs.n_rsc; i++) {
rsc_pwf[i].reset();
rsc_pwf[i].reset(i);
}
}
inline int deadlines_missed(int rsc_type) {
Expand Down
14 changes: 5 additions & 9 deletions client/rr_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ struct RR_SIM {
}
if (have_max_concurrent) {
max_concurrent_inc(rp);
if (p->rsc_pwf[0].sim_nused > p->rsc_pwf[0].max_nused) {
p->rsc_pwf[0].max_nused = p->rsc_pwf[0].sim_nused;
}
if (rt && p->rsc_pwf[rt].sim_nused > p->rsc_pwf[rt].max_nused) {
p->rsc_pwf[rt].max_nused = p->rsc_pwf[rt].sim_nused;
}
}
}

Expand Down Expand Up @@ -438,9 +432,11 @@ static void mc_update_stats(double sim_now, double dt, double buf_end) {
if (!p->app_configs.project_has_mc) continue;
for (int rt=0; rt<coprocs.n_rsc; rt++) {
RSC_PROJECT_WORK_FETCH& rsc_pwf = p->rsc_pwf[rt];
RSC_WORK_FETCH& rwf = rsc_work_fetch[rt];
double x = rsc_pwf.max_nused - rsc_pwf.sim_nused;
x = std::min(x, rwf.ninstances - rwf.sim_nused);

// x is the number of instances this project isn't using but could
// (given MC constraints)
//
double x = rsc_pwf.mc_max_could_use - rsc_pwf.sim_nused;
if (x > 1e-6 && sim_now < buf_end) {
double dt2;
if (sim_now + dt > buf_end) {
Expand Down
11 changes: 6 additions & 5 deletions client/sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,9 @@ void show_resource(int rsc_type) {
bool found = false;
for (i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
RESULT* rp = atp->result;
if (atp->task_state() != PROCESS_EXECUTING) continue;
RESULT* rp = atp->result;
PROJECT* p = rp->project;
double ninst=0;
if (rsc_type) {
if (rp->avp->gpu_usage.rsc_type != rsc_type) continue;
Expand All @@ -857,12 +858,11 @@ void show_resource(int rsc_type) {
ninst = rp->avp->avg_ncpus;
}

PROJECT* p = rp->project;
if (!found) {
found = true;
fprintf(html_out,
"<table>\n"
"<tr><th>#devs</th><th>Job name (* = high priority)</th><th>GFLOPs left</th>%s</tr>\n",
"<tr><th>#devs</th><th>App</th><th>Job name (* = high priority)</th><th>GFLOPs left</th>%s</tr>\n",
rsc_type?"<th>GPU</th>":""
);
}
Expand All @@ -871,8 +871,9 @@ void show_resource(int rsc_type) {
} else {
safe_strcpy(buf, "");
}
fprintf(html_out, "<tr valign=top><td>%.2f</td><td bgcolor=%s><font color=#ffffff>%s%s</font></td><td>%.0f</td>%s</tr>\n",
fprintf(html_out, "<tr valign=top><td>%.2f</td><td>%s</td><td bgcolor=%s><font color=#ffffff>%s%s</font></td><td>%.0f</td>%s</tr>\n",
ninst,
rp->wup->app->name,
colors[p->proj_index%NCOLORS],
rp->edf_scheduled?"*":"",
rp->name,
Expand Down Expand Up @@ -1340,7 +1341,7 @@ void clear_backoff() {
for (i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
for (int j=0; j<coprocs.n_rsc; j++) {
p->rsc_pwf[j].reset();
p->rsc_pwf[j].reset(j);
}
p->min_rpc_time = 0;
}
Expand Down
35 changes: 28 additions & 7 deletions client/work_fetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,37 @@ inline bool has_coproc_app(PROJECT* p, int rsc_type) {
/////////////// RSC_PROJECT_WORK_FETCH ///////////////

void RSC_PROJECT_WORK_FETCH::rr_init(PROJECT *p) {
unsigned int i;
fetchable_share = 0;
n_runnable_jobs = 0;
sim_nused = 0;
nused_total = 0;
deadlines_missed = 0;
mc_shortfall = 0;
last_mc_limit_reltime = 0;
max_nused = p->app_configs.project_min_mc;
if (p->app_configs.project_has_mc) {
// compute x = max usage over this resource over P's app versions
double x = 0;
for (i=0; i<gstate.app_versions.size(); i++) {
APP_VERSION* avp = gstate.app_versions[i];
if (avp->project != p) continue;
if (rsc_type && (avp->gpu_usage.rsc_type == rsc_type)) {
if (avp->gpu_usage.usage > x) x = avp->gpu_usage.usage;
} else {
if (avp->avg_ncpus > x) x = avp->avg_ncpus;
}
}

// max instances this project could use is (approximately)
// its smallest max concurrent limit times x
// This doesn't take into account e.g. that the MC limit
// could be from a different app than the one that determined x
//
mc_max_could_use = std::min(
p->app_configs.project_min_mc*x,
(double)(rsc_work_fetch[rsc_type].ninstances)
);
}
}

void RSC_PROJECT_WORK_FETCH::resource_backoff(PROJECT* p, const char* name) {
Expand All @@ -98,9 +121,7 @@ void RSC_PROJECT_WORK_FETCH::resource_backoff(PROJECT* p, const char* name) {
// check for backoff must go last, so that if that's the reason
// we know that there are no other reasons (for piggyback)
//
RSC_REASON RSC_PROJECT_WORK_FETCH::compute_rsc_project_reason(
PROJECT *p, int rsc_type
) {
RSC_REASON RSC_PROJECT_WORK_FETCH::compute_rsc_project_reason(PROJECT *p) {
RSC_WORK_FETCH& rwf = rsc_work_fetch[rsc_type];
// see whether work fetch for this resource is banned
// by prefs, config, project, or acct mgr
Expand Down Expand Up @@ -373,7 +394,7 @@ void RSC_WORK_FETCH::clear_request() {

void PROJECT_WORK_FETCH::reset(PROJECT* p) {
for (int i=0; i<coprocs.n_rsc; i++) {
p->rsc_pwf[i].reset();
p->rsc_pwf[i].reset(i);
}
}

Expand Down Expand Up @@ -696,7 +717,7 @@ void WORK_FETCH::setup() {
p->pwf.project_reason = compute_project_reason(p);
for (int j=0; j<coprocs.n_rsc; j++) {
RSC_PROJECT_WORK_FETCH& rpwf = p->rsc_pwf[j];
rpwf.rsc_project_reason = rpwf.compute_rsc_project_reason(p, j);
rpwf.rsc_project_reason = rpwf.compute_rsc_project_reason(p);
}
}
for (int j=0; j<coprocs.n_rsc; j++) {
Expand Down Expand Up @@ -827,7 +848,7 @@ PROJECT* WORK_FETCH::choose_project() {
}
}

// If rsc_index is nonzero, it's a resource that this project
// If rsc_index is non-neg, it's a resource that this project
// can ask for work, and which needs work.
// And this is the highest-priority project having this property.
// Request work from this resource,
Expand Down
15 changes: 10 additions & 5 deletions client/work_fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ typedef long long COPROC_INSTANCE_BITMAP;
// state per (resource, project) pair
//
struct RSC_PROJECT_WORK_FETCH {
int rsc_type;

// the following are persistent (saved in state file)
double backoff_time;
double backoff_interval;
Expand Down Expand Up @@ -121,8 +123,10 @@ struct RSC_PROJECT_WORK_FETCH {

// stuff for max concurrent logic
//
double max_nused;
// max # instances used so far in simulation.
double mc_max_could_use;
// max # instances the project could use,
// given its max concurrent limitations
// (we compute this in a kinda sloppy way)
double mc_shortfall;
// project's shortfall for this resources, given MC limits

Expand All @@ -143,19 +147,20 @@ struct RSC_PROJECT_WORK_FETCH {
pending.clear();
has_deferred_job = false;
rsc_project_reason = RSC_REASON_NONE;
max_nused = 0.0;
mc_max_could_use = 0.0;
mc_shortfall = 0.0;
}

inline void reset() {
inline void reset(int rt) {
rsc_type = rt;
backoff_time = 0;
backoff_interval = 0;
}

inline void reset_rec_accounting() {
secs_this_rec_interval = 0;
}
RSC_REASON compute_rsc_project_reason(PROJECT*, int rsc_type);
RSC_REASON compute_rsc_project_reason(PROJECT*);
void resource_backoff(PROJECT*, const char*);
void rr_init(PROJECT*);
void clear_backoff() {
Expand Down
5 changes: 3 additions & 2 deletions lib/cc_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,10 @@ struct APP_CONFIGS {
std::vector<APP_VERSION_CONFIG> app_version_configs;
int project_max_concurrent;
bool project_has_mc;
// have app- or project-level max concurrent restriction
// the project has app- or project-level restriction
// on # of concurrent jobs
int project_min_mc;
// the min of these restrictions
// if true, the min of these restrictions
bool report_results_immediately;

int parse(XML_PARSER&, MSG_VEC&, LOG_FLAGS&);
Expand Down

0 comments on commit af6c23c

Please sign in to comment.