Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust heartbeat behavior #180

Merged
merged 6 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3734,6 +3734,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
return recollect;
}

extern int jl_heartbeat_pause(void);
extern int jl_heartbeat_resume(void);

JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
{
JL_PROBE_GC_BEGIN(collection);
Expand Down Expand Up @@ -3775,6 +3778,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
// existence of the thread in the jl_n_threads count.
//
// TODO: concurrently queue objects
jl_heartbeat_pause();
jl_fence();
gc_n_threads = jl_atomic_load_acquire(&jl_n_threads);
gc_all_tls_states = jl_atomic_load_relaxed(&jl_all_tls_states);
Expand Down Expand Up @@ -3806,6 +3810,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)

gc_n_threads = 0;
gc_all_tls_states = NULL;
jl_heartbeat_resume();
jl_safepoint_end_gc();
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
JL_PROBE_GC_END();
Expand Down
16 changes: 15 additions & 1 deletion src/stackwalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,10 +1166,20 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT
}

extern int gc_first_tid;
extern int jl_inside_heartbeat_thread(void);
extern int jl_heartbeat_pause(void);
extern int jl_heartbeat_resume(void);

// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr
// Print backtraces for all live tasks, for all threads, to jl_safe_printf
// stderr. This can take a _long_ time!
JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
{
// disable heartbeats to prevent heartbeat loss while running this,
// unless this is called from the heartbeat thread
if (!jl_inside_heartbeat_thread()) {
jl_heartbeat_pause();
}
NHDaly marked this conversation as resolved.
Show resolved Hide resolved

size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
int ctid = -1;
Expand Down Expand Up @@ -1232,6 +1242,10 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
jl_safe_printf("thread (%d) ==== End thread %d\n", ctid, ptls2->tid + 1);
}
jl_safe_printf("thread (%d) ++++ Done\n", ctid);

if (!jl_inside_heartbeat_thread()) {
jl_heartbeat_resume();
}
}

#ifdef __cplusplus
Expand Down
59 changes: 55 additions & 4 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,45 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
return 0;
}

// temporarily pause the heartbeat thread
JL_DLLEXPORT int jl_heartbeat_pause(void)
{
if (!heartbeat_enabled) {
return -1;
}
heartbeat_enabled = 0;
return 0;
}

// resume the paused heartbeat thread
JL_DLLEXPORT int jl_heartbeat_resume(void)
{
// cannot resume if the heartbeat thread is already running
if (heartbeat_enabled) {
return -1;
}

// cannot resume if we weren't paused (disabled != paused)
if (heartbeat_interval_s == 0) {
return -1;
}

// heartbeat thread must be ready
if (uv_sem_trywait(&heartbeat_off_sem) != 0) {
return -1;
}

// reset state as we've been paused
n_hbs_missed = 0;
n_hbs_recvd = 0;
nickrobinson251 marked this conversation as resolved.
Show resolved Hide resolved
tasks_showed = 0;

// resume
heartbeat_enabled = 1;
uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread
return 0;
}

// heartbeat
JL_DLLEXPORT void jl_heartbeat(void)
{
Expand Down Expand Up @@ -1099,7 +1138,7 @@ void jl_heartbeat_threadfun(void *arg)
uv_sem_post(&heartbeat_off_sem);

// sleep the thread here; this semaphore is posted in
// jl_heartbeat_enable()
// jl_heartbeat_enable() or jl_heartbeat_resume()
uv_sem_wait(&heartbeat_on_sem);

// Set the sleep duration.
Expand All @@ -1111,7 +1150,7 @@ void jl_heartbeat_threadfun(void *arg)
// heartbeat is enabled; sleep, waiting for the desired interval
sleep_for(s, ns);

// if heartbeats were turned off while we were sleeping, reset
// if heartbeats were turned off/paused while we were sleeping, reset
if (!heartbeat_enabled) {
continue;
}
Expand All @@ -1122,13 +1161,15 @@ void jl_heartbeat_threadfun(void *arg)
tchb = jl_hrtime() - t0;

// adjust the next sleep duration based on how long the heartbeat
// check took
// check took, but if it took too long then use the normal duration
rs = 1;
while (tchb > 1e9) {
rs++;
tchb -= 1e9;
}
s = heartbeat_interval_s - rs;
if (rs < heartbeat_interval_s) {
s = heartbeat_interval_s - rs;
}
ns = 1e9 - tchb;
}
}
Expand All @@ -1150,6 +1191,16 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
return -1;
}

JL_DLLEXPORT int jl_heartbeat_pause(void)
{
return -1;
}

JL_DLLEXPORT int jl_heartbeat_resume(void)
{
return -1;
}

JL_DLLEXPORT void jl_heartbeat(void)
{
}
Expand Down