Skip to content

Commit

Permalink
Locking around output operations. Allow threads to exit gracefully.
Browse files Browse the repository at this point in the history
  • Loading branch information
bmah888 committed Nov 8, 2023
1 parent 0b1be25 commit fc86f85
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 39 deletions.
3 changes: 3 additions & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ struct iperf_stream
struct iperf_test* test;

pthread_t thr;
int done;

/* configurable members */
int local_port;
Expand Down Expand Up @@ -271,6 +272,8 @@ enum debug_level {

struct iperf_test
{
pthread_mutex_t print_mutex;

char role; /* 'c' lient or 's' erver */
enum iperf_mode mode;
int sender_has_retransmits;
Expand Down
90 changes: 79 additions & 11 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -2807,6 +2807,7 @@ struct iperf_test *
iperf_new_test()
{
struct iperf_test *test;
int rc;

test = (struct iperf_test *) malloc(sizeof(struct iperf_test));
if (!test) {
Expand All @@ -2816,6 +2817,21 @@ iperf_new_test()
/* initialize everything to zero */
memset(test, 0, sizeof(struct iperf_test));

/* Initialize mutex for printing output */
pthread_mutexattr_t mutexattr;
pthread_mutexattr_init(&mutexattr);
rc = pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_ERRORCHECK);
if (rc != 0) {
errno = rc;
perror("iperf_new_test: pthread_mutexattr_settype");
}

if (pthread_mutex_init(&(test->print_mutex), &mutexattr) != 0) {
perror("iperf_new_test: pthread_mutex_init");
}

pthread_mutexattr_destroy(&mutexattr);

test->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings));
if (!test->settings) {
free(test);
Expand Down Expand Up @@ -3069,6 +3085,14 @@ iperf_free_test(struct iperf_test *test)
free(prot);
}

/* Destroy print mutex. iperf_printf() doesn't work after this point */
int rc;
rc = pthread_mutex_destroy(&(test->print_mutex));
if (rc != 0) {
errno = rc;
perror("iperf_free_test: pthread_mutex_destroy");
}

if (test->logfile) {
free(test->logfile);
test->logfile = NULL;
Expand Down Expand Up @@ -4798,7 +4822,14 @@ iperf_json_finish(struct iperf_test *test)
if (test->json_output_string == NULL) {
return -1;
}

if (pthread_mutex_lock(&(test->print_mutex)) != 0) {
perror("iperf_json_finish: pthread_mutex_lock");
}
fprintf(test->outfile, "%s\n", test->json_output_string);
if (pthread_mutex_unlock(&(test->print_mutex)) != 0) {
perror("iperf_json_finish: pthread_mutex_unlock");
}
iflush(test);
cJSON_Delete(test->json_top);
test->json_top = NULL;
Expand Down Expand Up @@ -4907,6 +4938,10 @@ iperf_printf(struct iperf_test *test, const char* format, ...)
struct tm *ltm = NULL;
char *ct = NULL;

if (pthread_mutex_lock(&(test->print_mutex)) != 0) {
perror("iperf_print: pthread_mutex_lock");
}

/* Timestamp if requested */
if (iperf_get_test_timestamps(test)) {
time(&now);
Expand All @@ -4930,37 +4965,47 @@ iperf_printf(struct iperf_test *test, const char* format, ...)
if (test->role == 'c') {
if (ct) {
r0 = fprintf(test->outfile, "%s", ct);
if (r0 < 0)
return r0;
if (r0 < 0) {
r = r0;
goto bottom;
}
r += r0;
}
if (test->title) {
r0 = fprintf(test->outfile, "%s: ", test->title);
if (r0 < 0)
return r0;
if (r0 < 0) {
r = r0;
goto bottom;
}
r += r0;
}
va_start(argp, format);
r0 = vfprintf(test->outfile, format, argp);
va_end(argp);
if (r0 < 0)
return r0;
if (r0 < 0) {
r = r0;
goto bottom;
}
r += r0;
}
else if (test->role == 's') {
if (ct) {
r0 = snprintf(linebuffer, sizeof(linebuffer), "%s", ct);
if (r0 < 0)
return r0;
if (r0 < 0) {
r = r0;
goto bottom;
}
r += r0;
}
/* Should always be true as long as sizeof(ct) < sizeof(linebuffer) */
if (r < sizeof(linebuffer)) {
va_start(argp, format);
r0 = vsnprintf(linebuffer + r, sizeof(linebuffer) - r, format, argp);
va_end(argp);
if (r0 < 0)
return r0;
if (r0 < 0) {
r = r0;
goto bottom;
}
r += r0;
}
fprintf(test->outfile, "%s", linebuffer);
Expand All @@ -4971,11 +5016,34 @@ iperf_printf(struct iperf_test *test, const char* format, ...)
TAILQ_INSERT_TAIL(&(test->server_output_list), l, textlineentries);
}
}

bottom:
if (pthread_mutex_unlock(&(test->print_mutex)) != 0) {
perror("iperf_print: pthread_mutex_unlock");
}

return r;
}

int
iflush(struct iperf_test *test)
{
return fflush(test->outfile);
int rc2;

int rc;
rc = pthread_mutex_lock(&(test->print_mutex));
if (rc != 0) {
errno = rc;
perror("iflush: pthread_mutex_lock");
}

rc2 = fflush(test->outfile);

rc = pthread_mutex_unlock(&(test->print_mutex));
if (rc != 0) {
errno = rc;
perror("iflush: pthread_mutex_unlock");
}

return rc2;
}
32 changes: 11 additions & 21 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ iperf_client_worker_start(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

while (1) {
while (! (test->done)) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d\n", sp->socket);
}
sleep(1);
}
return NULL;
}

int
Expand Down Expand Up @@ -706,22 +707,18 @@ iperf_run_client(struct iperf_test * test)
/* Cancel sender threads */
SLIST_FOREACH(sp, &test->streams, streams) {
if (sp->sender) {
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
goto cleanup_and_fail;
}
sp->done = 1;
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
goto cleanup_and_fail;
}
sp->thr = 0;
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
}
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Sender threads cancelled\n");
iperf_printf(test, "Sender threads stopped\n");
}

// Unset non-blocking for non-UDP tests
Expand Down Expand Up @@ -753,22 +750,18 @@ iperf_run_client(struct iperf_test * test)
/* Cancel receiver threads */
SLIST_FOREACH(sp, &test->streams, streams) {
if (!sp->sender) {
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
goto cleanup_and_fail;
}
sp->done = 1;
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
goto cleanup_and_fail;
}
sp->thr = 0;
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
}
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Receiver threads cancelled\n");
iperf_printf(test, "Receiver threads stopped\n");
}

if (test->json_output) {
Expand All @@ -787,20 +780,17 @@ iperf_run_client(struct iperf_test * test)
/* Cancel all threads */
i_errno_save = i_errno;
SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
iperf_err(test, "cleanup_and_fail - %s", iperf_strerror(i_errno));
}
sp->done = 1;
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADCANCEL;
iperf_err(test, "cleanup_and_fail - %s", iperf_strerror(i_errno));
}
if (test->debug >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All threads cancelled\n");
iperf_printf(test, "All threads stopped\n");
}
i_errno = i_errno_save;

Expand Down
17 changes: 17 additions & 0 deletions src/iperf_error.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ iperf_err(struct iperf_test *test, const char *format, ...)
struct tm *ltm = NULL;
char *ct = NULL;

if (pthread_mutex_lock(&(test->print_mutex)) != 0) {
perror("iperf_err: pthread_mutex_lock");
}

/* Timestamp if requested */
if (test != NULL && test->timestamps) {
time(&now);
Expand Down Expand Up @@ -74,6 +78,10 @@ iperf_err(struct iperf_test *test, const char *format, ...)
}
}
va_end(argp);

if (pthread_mutex_unlock(&(test->print_mutex)) != 0) {
perror("iperf_err: pthread_mutex_unlock");
}
}

/* Do a printf to stderr or log file as appropriate, then exit. */
Expand All @@ -86,6 +94,10 @@ iperf_errexit(struct iperf_test *test, const char *format, ...)
struct tm *ltm = NULL;
char *ct = NULL;

if (pthread_mutex_lock(&(test->print_mutex)) != 0) {
perror("iperf_errexit: pthread_mutex_lock");
}

/* Timestamp if requested */
if (test != NULL && test->timestamps) {
time(&now);
Expand Down Expand Up @@ -114,6 +126,11 @@ iperf_errexit(struct iperf_test *test, const char *format, ...)
}
fprintf(stderr, "iperf3: %s\n", str);
}

if (pthread_mutex_unlock(&(test->print_mutex)) != 0) {
perror("iperf_errexit: pthread_mutex_unlock");
}

va_end(argp);
if (test)
iperf_delete_pidfile(test);
Expand Down
12 changes: 5 additions & 7 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ iperf_server_worker_start(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

while (1) {
while (! (test->done)) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d\n", sp->socket);
}
sleep(1);
}
return NULL;
}

int
Expand Down Expand Up @@ -404,22 +405,19 @@ cleanup_server(struct iperf_test *test)
/* Cancel threads */
int i_errno_save = i_errno;
SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
iperf_err(test, "cleanup_server - %s", iperf_strerror(i_errno));
}
sp->done = 1;
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
iperf_err(test, "cleanup_server - %s", iperf_strerror(i_errno));
}
if (test->debug >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d cancelled\n", sp->socket);
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
}
}
i_errno = i_errno_save;

if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All threads cancelled\n");
iperf_printf(test, "All threads stopped\n");
}

/* Close open streams */
Expand Down

0 comments on commit fc86f85

Please sign in to comment.