Skip to content

Commit

Permalink
use atomic functions for global switches
Browse files Browse the repository at this point in the history
  • Loading branch information
djnym committed Oct 28, 2016
1 parent 283d12d commit 82327d9
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 105 deletions.
3 changes: 3 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
Version 1.0.2 (anthonym)
* switch to atomic operation functions for global switches to appease helgrind

Version 1.0.1 (anthonym)
* forgot to bump lwes/mondemand versions for travis

Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dnl Process this file with autoconf to produce a configure script.

AC_INIT([lwes-journaller], [1.0.1], [lwes-devel@lists.sourceforge.net])
AC_INIT([lwes-journaller], [1.0.2], [lwes-devel@lists.sourceforge.net])
AM_INIT_AUTOMAKE([foreign])

dnl Determine the host type for the host specific inclusion below
Expand Down
2 changes: 1 addition & 1 deletion src/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static FILE* get_log()
{
fclose(log);
log = NULL;
gbl_rotate_log = 0;
__sync_bool_compare_and_swap(&gbl_rotate_log,1,0);
}

/* if we have no log open now, open one. */
Expand Down
4 changes: 2 additions & 2 deletions src/opt.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ char* arg_disk_journals[10];
* round intervals starting at beginning of day)
*/
int arg_journal_rotate_interval = 0;
int arg_wakup_interval_ms = WAKEUP_MS;
int arg_wakeup_interval_ms = WAKEUP_MS;

int arg_nodaemonize = 0;

Expand Down Expand Up @@ -140,7 +140,7 @@ void process_options(int argc, const char* argv[])
{ "site", 'n', POPT_ARG_INT, &arg_site, 0, "Site id", "int" },
{ "sockbuffer", 0, POPT_ARG_INT, &arg_sockbuffer, 0, "Receive socket buffer size", "bytes" },
{ "ttl", 0, POPT_ARG_INT, &arg_ttl, 0, "Emitting TTL value", "hops" },
{ "wakeup", 'w', POPT_ARG_INT, &arg_wakup_interval_ms, 0, "How often to break checking for signals", "milliseconds" },
{ "wakeup", 'w', POPT_ARG_INT, &arg_wakeup_interval_ms, 0, "How often to break checking for signals", "milliseconds" },
{ "user", 0, POPT_ARG_STRING, &arg_journal_user, 0, "Owner of journal files", "user" },
{ "version", 'v', POPT_ARG_NONE, &arg_version, 0, "Display version, then exit", 0 },
{ "xport-type", 'x', POPT_ARG_STRING, &arg_xport, 0, "Transport, dflt=udp", "{" ARG_UDP ", ...}" },
Expand Down
4 changes: 2 additions & 2 deletions src/opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ extern int arg_ttl;
extern int arg_journal_uid;
extern int arg_version;
extern const char* arg_xport;
extern int arg_wakup_interval_ms;
extern int arg_wakeup_interval_ms;

#ifdef HAVE_MONDEMAND
extern const char* arg_mondemand_host;
Expand All @@ -67,7 +67,7 @@ extern const char* arg_mondemand_program_id;
/* arg_xport: */
#define ARG_UDP "udp"

#define WAKEUP_MS 100
#define WAKEUP_MS 100

void process_options(int argc, const char* argv[]);
void options_destructor (void);
Expand Down
4 changes: 2 additions & 2 deletions src/process_model.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void process_model(const char* argv[])
"trigger log rotate.\n", queue_to_journal_pid);
kill(queue_to_journal_pid, SIGHUP);
}
gbl_rotate_dequeue = 0;
__sync_bool_compare_and_swap(&gbl_rotate_dequeue,1,0);
}
if ( gbl_rotate_enqueue )
{
Expand All @@ -166,7 +166,7 @@ void process_model(const char* argv[])
"trigger log rotate.\n", xport_to_queue_pid);
kill(xport_to_queue_pid, SIGHUP);
}
gbl_rotate_dequeue = 0;
__sync_bool_compare_and_swap(&gbl_rotate_enqueue,1,0);
}
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions src/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <stddef.h>

#define QUEUE_INTR -2

/* Queue methods:
*
* open() -- opens a queue, return 0 on success, -1 on error
Expand Down
6 changes: 4 additions & 2 deletions src/queue_mqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ static int xread (struct queue* this_queue, void* buf,
LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done);

/* use blocking reads unless we're shutting down. */
struct timespec time_buf = { time(NULL), arg_wakup_interval_ms * 1000000 };
int wakeup_secs = arg_wakeup_interval_ms / 1000;
int wakeup_ms = arg_wakeup_interval_ms % 1000;
struct timespec time_buf = { time(NULL) + wakeup_secs, wakeup_ms * 1000000 };
mq_rec_rtrn = mq_timedreceive (ppriv->mq, buf, count, &pri, &time_buf);

if (mq_rec_rtrn < 0 )
Expand All @@ -157,7 +159,7 @@ static int xread (struct queue* this_queue, void* buf,
down, so no need to print errors. */
case ETIMEDOUT:
case EINTR:
return mq_rec_rtrn;
return QUEUE_INTR;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/queue_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ static int xread (struct queue* this_queue, void* buf,
* print errors. */
case EIDRM:
case EINTR:
return msgrcv_ret;
return QUEUE_INTR;
}
}

Expand Down
117 changes: 65 additions & 52 deletions src/queue_to_journal.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ static int rotate(struct journal* jrn, int jcurr)
{
unsigned long long t0 = time_in_milliseconds(), t1;

dequeuer_stats_rotate(&dst);
dequeuer_stats_flush();

if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
{
LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
Expand Down Expand Up @@ -68,7 +65,7 @@ int queue_to_journal(void)
int jcurr = 0;
void* buf = NULL ;
size_t bufsiz;
int pending = 0, write_pending = 0;
int pending = 0;
unsigned long long t0, receive_time, max_receive_time=0,
total_receive_time=0, write_time, max_write_time=0, total_write_time=0;

Expand Down Expand Up @@ -113,81 +110,96 @@ int queue_to_journal(void)
LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
exit(EXIT_FAILURE);
}
memset(buf, 0, bufsiz);

/* Read a packet from the queue, write it to the journal. */
while ( ! gbl_done )
{
int que_read_ret;
int jrn_write_ret;

/* If we have a pending rotate to perform, do it now. */
if ( gbl_rotate_dequeue )
/* 0 out part of the the event name so if we get a rotate event
* the program will not continually rotate */
memset(buf, 0, HEADER_LENGTH+20);

t0 = time_in_milliseconds();
que_read_ret = que.vtbl->read(&que, buf, bufsiz, &pending);
if (que_read_ret >= 0)
{
receive_time = time_in_milliseconds() - t0;
max_receive_time =
max_receive_time < receive_time ? receive_time : max_receive_time;
total_receive_time += receive_time;
dequeuer_stats_record(&dst, que_read_ret-HEADER_LENGTH, pending);
}
else if (que_read_ret == QUEUE_INTR )
{
/* ignore expected interrupts */
}
else
{
LOG_INF("Received other interruption\n");
continue; /* no event, so do not process the rest */
}

// is this a command event?
if ( header_is_rotate(buf) || gbl_rotate_dequeue || gbl_done )
{
if (header_is_rotate (buf))
{
// is it a new enough Command::Rotate, or masked out?
memcpy(&dst.latest_rotate_header, buf, HEADER_LENGTH);
dst.rotation_type = LJ_RT_EVENT;
}
dequeuer_stats_rotate(&dst);

/* if we are shutting down the destructor will flush stats,
* so skip so we don't get duplicate events sent to mondemand
* if it is enabled
*/
if (! gbl_done )
{
dequeuer_stats_flush();
}

LOG_INF("Maximum receive time was %0.2f seconds;"
" total receive time was %0.2f seconds\n",
max_receive_time/1000000., total_receive_time/1000000.);
LOG_INF("Maximum write time was %0.2f seconds;"
" total write time was %0.2f seconds\n",
max_write_time/1000000., total_write_time/1000000.);
LOG_INF("About to rotate journal (%d pending).\n", pending);
write_pending = 1;
jcurr = rotate(jrn,jcurr);
max_receive_time = 0;
max_write_time = 0;
total_receive_time = 0;
total_write_time = 0;

gbl_rotate_dequeue = 0;
if (gbl_rotate_dequeue)
{
__sync_bool_compare_and_swap(&gbl_rotate_dequeue,1,0);
}
}

t0 = time_in_milliseconds();
if ( (que_read_ret = que.vtbl->read(&que, buf, bufsiz, &pending)) < 0 )
if (que_read_ret != QUEUE_INTR)
{
/* queue is empty */
if (gbl_done) break; /* if we're shutting down, exit this loop. */
continue; /* no event, so do not process the rest */
t0 = time_in_milliseconds();
/* Write the packet out to the journal. */
if ( (jrn_write_ret = jrn[jcurr].vtbl->write(&jrn[jcurr],
buf, que_read_ret))
!= que_read_ret )
{
LOG_ER("Journal write error -- attempted to write %d bytes, "
"write returned %d.\n", que_read_ret, jrn_write_ret);
dequeuer_stats_record_loss(&dst);
}
write_time = time_in_milliseconds() - t0;
max_write_time =
max_write_time < write_time ? write_time : max_write_time;
total_write_time += write_time;
}
receive_time = time_in_milliseconds() - t0;
if ( max_receive_time < receive_time ) max_receive_time = receive_time;
total_receive_time += receive_time;

LOG_PROG("Read %d bytes from queue (%d pending).\n",
que_read_ret, pending);
if (write_pending)
{
LOG_INF("Done with rotating journal (%d pending).\n", pending);
write_pending = 0;
}

// is this a command event?
if ( header_is_rotate(buf) )
{ // Command::Rotate
// is it a new enough Command::Rotate, or masked out?
memcpy(&dst.latest_rotate_header, buf, HEADER_LENGTH) ;
dst.rotation_type = LJ_RT_EVENT;
gbl_rotate_dequeue = 1;
gbl_rotate_enqueue = 1;
}

t0 = time_in_milliseconds();
dequeuer_stats_record(&dst, que_read_ret-HEADER_LENGTH, pending);
/* Write the packet out to the journal. */
if ( (jrn_write_ret = jrn[jcurr].vtbl->write(&jrn[jcurr],
buf, que_read_ret))
!= que_read_ret )
{
LOG_ER("Journal write error -- attempted to write %d bytes, "
"write returned %d.\n", que_read_ret, jrn_write_ret);
dequeuer_stats_record_loss(&dst);
}
write_time = time_in_milliseconds() - t0;
if ( max_write_time < write_time ) max_write_time = write_time;
total_write_time += write_time;
} /* while ( ! gbl_done) */

dequeuer_stats_rotate(&dst);
dequeuer_stats_flush();

if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
{
LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
Expand All @@ -206,6 +218,7 @@ int queue_to_journal(void)
que.vtbl->dealloc(&que, buf);
que.vtbl->destructor(&que);

dequeuer_stats_rotate(&dst);
dequeuer_stats_report(&dst);
dequeuer_stats_dtor(&dst);

Expand Down
45 changes: 28 additions & 17 deletions src/serial_model.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,26 @@ static void serial_open_journal(void)
}
}

static void serial_close_journal(void)
static void serial_close_journal(int is_rotate_event)
{
if (gbl_done || gbl_rotate_enqueue)
if (gbl_done || gbl_rotate_enqueue || is_rotate_event)
{
enqueuer_stats_rotate(&est);
enqueuer_stats_flush();
gbl_rotate_enqueue = 0;
if (!gbl_done)
{
enqueuer_stats_flush();
}
__sync_bool_compare_and_swap(&gbl_rotate_enqueue,1,0);
}

if (gbl_done || gbl_rotate_dequeue)
if (gbl_done || gbl_rotate_dequeue || is_rotate_event)
{
dequeuer_stats_rotate(&dst);
dequeuer_stats_flush();
gbl_rotate_dequeue = 0;
if (!gbl_done)
{
dequeuer_stats_flush();
}
__sync_bool_compare_and_swap(&gbl_rotate_dequeue,1,0);
}

if (jrn.vtbl->close(&jrn) < 0) {
Expand All @@ -134,17 +140,22 @@ static void serial_close_journal(void)
}
}

static void serial_rotate(void)
static void serial_rotate(int is_rotate_event)
{
serial_close_journal();
serial_close_journal(is_rotate_event);
serial_open_journal();
}

static int serial_read(void)
{
unsigned long addr;
short port;
int xpt_read_ret =
short port;

/* 0 out part of the the event name so if we get a rotate event the program
* will not continually rotate */
memset(buf, 0, HEADER_LENGTH+20);

int xpt_read_ret =
xpt.vtbl->read(&xpt, buf+HEADER_LENGTH, BUFLEN-HEADER_LENGTH, &addr, &port);

if (xpt_read_ret >= 0)
Expand Down Expand Up @@ -250,7 +261,7 @@ static void serial_write(void)

static void serial_dtor(void)
{
serial_close_journal();
serial_close_journal(0);
lwes_emitter_destroy(emitter);
xpt.vtbl->destructor(&xpt);
jrn.vtbl->destructor(&jrn);
Expand All @@ -263,7 +274,7 @@ void serial_model(void)
serial_ctor();

do {
int is_rotate = 0;
int is_rotate_event = 0;
int read_ret = serial_read();
/* -1 is an error we don't deal with, so just skip out of the loop */
if (read_ret == -1) continue;
Expand All @@ -278,11 +289,11 @@ void serial_model(void)
if (header_is_rotate(buf)) {
memcpy(&dst.latest_rotate_header, buf, HEADER_LENGTH) ;
dst.rotation_type = LJ_RT_EVENT;
is_rotate = 1;
is_rotate_event = 1;
}
if (is_rotate || gbl_rotate_dequeue || gbl_rotate_enqueue) {
serial_rotate();
is_rotate = 0;
if (is_rotate_event || gbl_rotate_dequeue || gbl_rotate_enqueue) {
serial_rotate(is_rotate_event);
is_rotate_event = 0;
}
/* maybe send depth test */
if (tm >= depth_tm + depth_dtm) serial_send_buffer_depth_test();
Expand Down
Loading

0 comments on commit 82327d9

Please sign in to comment.