Skip to content

Commit

Permalink
bgpd: update pthreads to use lib changes
Browse files Browse the repository at this point in the history
Use the new threading facilities provided in lib/ to streamline the
threads used in bgpd. In particular, all of the lifecycle code has been
removed from the I/O thread and replaced with the default loop. Did not
do the same to the keepalives thread as it is much smaller (doesn't need
the event system).

Also cleaned up some comments to match the style guide.

Signed-off-by: Quentin Young <qlyoung@cumulusnetworks.com>
  • Loading branch information
qlyoung committed Jan 24, 2018
1 parent 87a2737 commit df9b438
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 190 deletions.
138 changes: 25 additions & 113 deletions bgpd/bgp_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,100 +51,12 @@ static bool validate_header(struct peer *);
#define BGP_IO_TRANS_ERR (1 << 0) // EAGAIN or similar occurred
#define BGP_IO_FATAL_ERR (1 << 1) // some kind of fatal TCP error

/* Plumbing & control variables for thread lifecycle
* ------------------------------------------------------------------------ */
bool bgp_io_thread_run;
pthread_mutex_t *running_cond_mtx;
pthread_cond_t *running_cond;

/* Unused callback for thread_add_read() */
static int bgp_io_dummy(struct thread *thread) { return 0; }

/* Poison pill task */
static int bgp_io_finish(struct thread *thread)
{
bgp_io_thread_run = false;
return 0;
}

/* Extern lifecycle control functions. init -> start -> stop
* ------------------------------------------------------------------------ */
void bgp_io_init()
{
bgp_io_thread_run = false;

running_cond_mtx = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_mutex_t));
running_cond = XCALLOC(MTYPE_PTHREAD_PRIM, sizeof(pthread_cond_t));

pthread_mutex_init(running_cond_mtx, NULL);
pthread_cond_init(running_cond, NULL);

/* unlocked in bgp_io_wait_running() */
pthread_mutex_lock(running_cond_mtx);
}

void *bgp_io_start(void *arg)
{
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
fpt->master->owner = pthread_self();

// fd so we can sleep in poll()
int sleeper[2];
pipe(sleeper);
thread_add_read(fpt->master, &bgp_io_dummy, NULL, sleeper[0], NULL);

// we definitely don't want to handle signals
fpt->master->handle_signals = false;

struct thread task;

pthread_mutex_lock(running_cond_mtx);
{
bgp_io_thread_run = true;
pthread_cond_signal(running_cond);
}
pthread_mutex_unlock(running_cond_mtx);

while (bgp_io_thread_run) {
if (thread_fetch(fpt->master, &task)) {
thread_call(&task);
}
}

close(sleeper[1]);
close(sleeper[0]);

return NULL;
}

void bgp_io_wait_running()
{
while (!bgp_io_thread_run)
pthread_cond_wait(running_cond, running_cond_mtx);

/* locked in bgp_io_init() */
pthread_mutex_unlock(running_cond_mtx);
}

int bgp_io_stop(void **result, struct frr_pthread *fpt)
{
thread_add_event(fpt->master, &bgp_io_finish, NULL, 0, NULL);
pthread_join(fpt->thread, result);

pthread_mutex_destroy(running_cond_mtx);
pthread_cond_destroy(running_cond);

XFREE(MTYPE_PTHREAD_PRIM, running_cond_mtx);
XFREE(MTYPE_PTHREAD_PRIM, running_cond);

return 0;
}

/* Extern API -------------------------------------------------------------- */
/* Thread external API ----------------------------------------------------- */

void bgp_writes_on(struct peer *peer)
{
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

assert(peer->status != Deleted);
assert(peer->obuf);
Expand All @@ -154,18 +66,15 @@ void bgp_writes_on(struct peer *peer)
assert(!peer->t_connect_check_w);
assert(peer->fd);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);

thread_add_write(fpt->master, bgp_process_writes, peer, peer->fd,
&peer->t_write);
SET_FLAG(peer->thread_flags, PEER_THREAD_WRITES_ON);
}

void bgp_writes_off(struct peer *peer)
{
assert(bgp_io_thread_run);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

thread_cancel_async(fpt->master, &peer->t_write, NULL);
THREAD_OFF(peer->t_generate_updgrp_packets);
Expand All @@ -175,7 +84,8 @@ void bgp_writes_off(struct peer *peer)

void bgp_reads_on(struct peer *peer)
{
assert(bgp_io_thread_run);
struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

assert(peer->status != Deleted);
assert(peer->ibuf);
Expand All @@ -186,8 +96,6 @@ void bgp_reads_on(struct peer *peer)
assert(!peer->t_connect_check_w);
assert(peer->fd);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);

thread_add_read(fpt->master, bgp_process_reads, peer, peer->fd,
&peer->t_read);

Expand All @@ -196,19 +104,18 @@ void bgp_reads_on(struct peer *peer)

void bgp_reads_off(struct peer *peer)
{
assert(bgp_io_thread_run);

struct frr_pthread *fpt = frr_pthread_get(PTHREAD_IO);
assert(fpt->running);

thread_cancel_async(fpt->master, &peer->t_read, NULL);
THREAD_OFF(peer->t_process_packet);

UNSET_FLAG(peer->thread_flags, PEER_THREAD_READS_ON);
}

/* Internal functions ------------------------------------------------------- */
/* Thread internal functions ----------------------------------------------- */

/**
/*
* Called from I/O pthread when a file descriptor has become ready for writing.
*/
static int bgp_process_writes(struct thread *thread)
Expand All @@ -231,11 +138,13 @@ static int bgp_process_writes(struct thread *thread)
}
pthread_mutex_unlock(&peer->io_mtx);

if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) { /* no problem */
/* no problem */
if (CHECK_FLAG(status, BGP_IO_TRANS_ERR)) {
}

/* problem */
if (CHECK_FLAG(status, BGP_IO_FATAL_ERR)) {
reschedule = false; /* problem */
reschedule = false;
fatal = true;
}

Expand All @@ -250,7 +159,7 @@ static int bgp_process_writes(struct thread *thread)
return 0;
}

/**
/*
* Called from I/O pthread when a file descriptor has become ready for reading,
* or has hung up.
*
Expand Down Expand Up @@ -321,8 +230,10 @@ static int bgp_process_reads(struct thread *thread)
/* if this fails we are seriously screwed */
assert(pktsize <= BGP_MAX_PACKET_SIZE);

/* If we have that much data, chuck it into its own
* stream and append to input queue for processing. */
/*
* If we have that much data, chuck it into its own
* stream and append to input queue for processing.
*/
if (ringbuf_remain(ibw) >= pktsize) {
struct stream *pkt = stream_new(pktsize);
assert(ringbuf_get(ibw, pktbuf, pktsize) == pktsize);
Expand Down Expand Up @@ -356,7 +267,7 @@ static int bgp_process_reads(struct thread *thread)
return 0;
}

/**
/*
* Flush peer output buffer.
*
* This function pops packets off of peer->obuf and writes them to peer->fd.
Expand All @@ -379,7 +290,6 @@ static uint16_t bgp_write(struct peer *peer)
uint16_t status = 0;
uint32_t wpkt_quanta_old;

// cache current write quanta
wpkt_quanta_old =
atomic_load_explicit(&peer->bgp->wpkt_quanta, memory_order_relaxed);

Expand All @@ -398,7 +308,7 @@ static uint16_t bgp_write(struct peer *peer)
}

goto done;
} else if (num != writenum) // incomplete write
} else if (num != writenum)
stream_forward_getp(s, num);

} while (num != writenum);
Expand Down Expand Up @@ -427,8 +337,10 @@ static uint16_t bgp_write(struct peer *peer)
if (peer->v_start >= (60 * 2))
peer->v_start = (60 * 2);

/* Handle Graceful Restart case where the state changes
* to Connect instead of Idle */
/*
* Handle Graceful Restart case where the state changes
* to Connect instead of Idle.
*/
BGP_EVENT_ADD(peer, BGP_Stop);
goto done;

Expand Down Expand Up @@ -472,7 +384,7 @@ done : {
return status;
}

/**
/*
* Reads a chunk of data from peer->fd into peer->ibuf_work.
*
* @return status flag (see top-of-file)
Expand Down
17 changes: 0 additions & 17 deletions bgpd/bgp_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,13 @@
#include "bgpd/bgpd.h"
#include "frr_pthread.h"

/**
* Initializes data structures and flags for the write thread.
*
* This function must be called from the main thread before
* bgp_writes_start() is invoked.
*/
extern void bgp_io_init(void);

/**
* Start function for write thread.
*
* @param arg - unused
*/
extern void *bgp_io_start(void *arg);

/**
* Wait until the IO thread is ready to accept jobs.
*
* This function must be called immediately after the thread has been created
* for running. Use of other functions before calling this one will result in
* undefined behavior.
*/
extern void bgp_io_wait_running(void);

/**
* Start function for write thread.
*
Expand Down
Loading

0 comments on commit df9b438

Please sign in to comment.