Skip to content

Commit

Permalink
Use a more correct algorithm for the global pool block list
Browse files Browse the repository at this point in the history
The previous algorithm contained a data race which could cause some
nodes of the list to be incorrectly discarded.
  • Loading branch information
Benoit Vey authored and SeanTAllen committed Mar 17, 2017
1 parent c708009 commit 1eb70dc
Showing 1 changed file with 105 additions and 158 deletions.
263 changes: 105 additions & 158 deletions src/libponyrt/mem/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,18 @@ typedef struct pool_global_t
#endif
} pool_global_t;

typedef struct pool_block_t pool_block_t;

PONY_ABA_PROTECTED_PTR_DECLARE(pool_block_t)

/// An item on an either thread-local or global list of free blocks.
struct pool_block_t
typedef struct pool_block_t
{
#ifdef PLATFORM_IS_X86
union
{
struct
{
pool_block_t* prev;
pool_block_t* next;
};
PONY_ATOMIC_ABA_PROTECTED_PTR(pool_block_t) global;
};
#else
pool_block_t* prev;
struct pool_block_t* prev;
union
{
pool_block_t* next;
PONY_ATOMIC(pool_block_t*) global;
struct pool_block_t* next;
PONY_ATOMIC(struct pool_block_t*) global;
};
#endif
size_t size;
};
PONY_ATOMIC(bool) acquired;
} pool_block_t;

/// A thread local list of free blocks header.
typedef struct pool_block_header_t
Expand Down Expand Up @@ -124,11 +109,8 @@ static pool_global_t pool_global[POOL_COUNT] =
{POOL_MIN << 15, POOL_MAX / (POOL_MIN << 15), {{NULL, 0}}},
};

#ifdef PLATFORM_IS_X86
static PONY_ATOMIC_ABA_PROTECTED_PTR(pool_block_t) pool_block_global;
#else
static PONY_ATOMIC(pool_block_t*) pool_block_global;
#endif
static pool_block_t pool_block_global;
static PONY_ATOMIC(size_t) in_pool_block_global;

static __pony_thread_local pool_local_t pool_local[POOL_COUNT];
static __pony_thread_local pool_block_header_t pool_block_header;
Expand Down Expand Up @@ -271,7 +253,7 @@ static void track_init()
return;

track.init = true;
track.thread_id = atomic_fetch_add(&track_global_thread_id, 1,
track.thread_id = atomic_fetch_add_explicit(&track_global_thread_id, 1,
memory_order_relaxed);
track_global_info[track.thread_id] = &track;

Expand Down Expand Up @@ -414,199 +396,164 @@ static void pool_block_insert(pool_block_t* block)
next->prev = block;
}

static void pool_block_push(pool_block_t* block)
void pool_block_push(pool_block_t* block)
{
#ifdef PLATFORM_IS_X86
PONY_ABA_PROTECTED_PTR(pool_block_t)* dst = NULL;
PONY_ABA_PROTECTED_PTR(pool_block_t) cmp;
PONY_ABA_PROTECTED_PTR(pool_block_t) xchg;
cmp.object = pool_block_global.object;
cmp.counter = pool_block_global.counter;
#else
PONY_ATOMIC(pool_block_t*)* dst;
pool_block_t* next = NULL;
#endif
pool_block_t* pos;
atomic_store_explicit(&block->acquired, false, memory_order_relaxed);
atomic_fetch_add_explicit(&in_pool_block_global, 1, memory_order_relaxed);

do
while(true)
{
#ifdef PLATFORM_IS_X86
PONY_ABA_PROTECTED_PTR(pool_block_t) cmp_prev = {{NULL, 0}};

// If dst is &pool_block_global, we did a CAS iteration and failed to get
// pool_block_global. cmp already holds the right value, don't reload in
// that case.
if(dst != &pool_block_global)
{
cmp = bigatomic_load_explicit(&pool_block_global, memory_order_acquire);
# ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&pool_block_global);
# endif
}

pos = cmp.object;
#else
pos = atomic_load_explicit(&pool_block_global, memory_order_acquire);
# ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&pool_block_global);
# endif
pool_block_t* pos = atomic_load_explicit(&pool_block_global.global,
memory_order_acquire);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&pool_block_global.global);
#endif

// Find an insertion position. The list is sorted and stays sorted after an
// insertion.
pool_block_t* prev = NULL;
pool_block_t* prev = &pool_block_global;
while((pos != NULL) && (block->size > pos->size))
{
#ifdef PLATFORM_IS_X86
cmp_prev = cmp;
cmp = bigatomic_load_explicit(&pos->global, memory_order_acquire);
prev = pos;
pos = cmp.object;
#else
prev = pos;
pos = atomic_load_explicit(&pos->global, memory_order_acquire);
#endif
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&pos->global);
#endif
}

if(prev == NULL)
{
// Insert at the beginning.
#ifdef PLATFORM_IS_X86
PONY_ABA_PROTECTED_PTR(pool_block_t)* ptr = &block->global;
ptr->object = NULL;
ptr->counter = cmp_prev.counter + 1;
#else
atomic_store_explicit(&block->global, NULL, memory_order_relaxed);
if(atomic_exchange_explicit(&prev->acquired, true, memory_order_acquire))
continue;

#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&prev->acquired);
#endif
dst = &pool_block_global;
} else if(pos == NULL) {
// Insert at the end.
#ifdef PLATFORM_IS_X86
PONY_ABA_PROTECTED_PTR(pool_block_t)* ptr = &block->global;
ptr->object = NULL;
ptr->counter = cmp.counter + 1;
#else
atomic_store_explicit(&block->global, NULL, memory_order_relaxed);

pool_block_t* check_pos = atomic_load_explicit(&prev->global,
memory_order_relaxed);

if(pos != check_pos)
{
atomic_store_explicit(&prev->acquired, false, memory_order_relaxed);
continue;
}

atomic_store_explicit(&block->global, pos, memory_order_relaxed);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(&prev->global);
#endif
dst = &prev->global;
} else {
#ifdef PLATFORM_IS_X86
cmp = bigatomic_load_explicit(&pos->global, memory_order_relaxed);
PONY_ABA_PROTECTED_PTR(pool_block_t)* ptr = &block->global;
ptr->object = cmp.object;
ptr->counter = cmp.counter + 1;
#else
next = atomic_load_explicit(&pos->global, memory_order_relaxed);
atomic_store_explicit(&block->global, next, memory_order_relaxed);
atomic_store_explicit(&prev->global, block, memory_order_release);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(&prev->acquired);
#endif
dst = &pos->global;
}
atomic_store_explicit(&prev->acquired, false, memory_order_release);

// If the compare_exchange fails, somebody else got the node and we have to
// look for another position to insert the block.
break;
}

#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(dst);
#endif
#ifdef PLATFORM_IS_X86
xchg.object = block;
xchg.counter = cmp.counter + 1;
} while(!bigatomic_compare_exchange_weak_explicit(dst, &cmp, xchg,
memory_order_release, memory_order_relaxed));
#else
} while(!atomic_compare_exchange_weak_explicit(dst, &next, block,
memory_order_release, memory_order_relaxed));
ANNOTATE_HAPPENS_BEFORE(&in_pool_block_global);
#endif
atomic_fetch_sub_explicit(&in_pool_block_global, 1, memory_order_release);
}

static void* pool_block_pull(size_t size)
void* pool_block_pull(size_t size)
{
pool_block_t* block;
#ifdef PLATFORM_IS_X86
PONY_ABA_PROTECTED_PTR(pool_block_t)* dst = NULL;
PONY_ABA_PROTECTED_PTR(pool_block_t) cmp;
PONY_ABA_PROTECTED_PTR(pool_block_t) xchg;
cmp.object = pool_block_global.object;
pool_block_t* block = atomic_load_explicit(&pool_block_global.global,
memory_order_relaxed);

// Fast bailout path without additional branching.
if(cmp.object == NULL)
if(block == NULL)
return NULL;

cmp.counter = pool_block_global.counter;
#else
PONY_ATOMIC(pool_block_t*)* dst;
pool_block_t* next;
#endif
pool_block_t* prev = NULL;
atomic_fetch_add_explicit(&in_pool_block_global, 1, memory_order_relaxed);

do
while(true)
{
#ifdef PLATFORM_IS_X86
// If dst is &pool_block_global, we did a CAS iteration and failed to get
// pool_block_global. cmp already holds the right value, don't reload in
// that case.
if(dst != &pool_block_global)
cmp = bigatomic_load_explicit(&pool_block_global, memory_order_relaxed);
block = atomic_load_explicit(&pool_block_global.global,
memory_order_relaxed);

block = cmp.object;
#else
block = atomic_load_explicit(&pool_block_global, memory_order_relaxed);
#endif
if(block == NULL)
{
atomic_fetch_sub_explicit(&in_pool_block_global, 1, memory_order_relaxed);
return NULL;
}

atomic_thread_fence(memory_order_acquire);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&pool_block_global);
ANNOTATE_HAPPENS_AFTER(&pool_block_global.global);
#endif

pool_block_t* prev = &pool_block_global;

// Find a big enough block. The list is sorted.
while((block != NULL) && (size > block->size))
{
#ifdef PLATFORM_IS_X86
prev = block;
cmp = bigatomic_load_explicit(&block->global, memory_order_acquire);
block = cmp.object;
#else
prev = block;
block = atomic_load_explicit(&block->global, memory_order_acquire);
#endif
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&block->global);
#endif
}

// No suitable block.
if(block == NULL)
{
atomic_fetch_sub_explicit(&in_pool_block_global, 1, memory_order_relaxed);
return NULL;
}

#ifdef PLATFORM_IS_X86
xchg = bigatomic_load_explicit(&block->global, memory_order_relaxed);
xchg.counter = cmp.counter + 1;
#else
next = atomic_load_explicit(&block->global, memory_order_relaxed);
if(atomic_exchange_explicit(&prev->acquired, true, memory_order_acquire))
continue;

#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&prev->acquired);
#endif
dst = (prev == NULL) ? &pool_block_global : &prev->global;

// If the compare_exchange fails, somebody else got the node and we have to
// look for another block.
#ifdef PLATFORM_IS_X86
} while(!bigatomic_compare_exchange_weak_explicit(dst, &cmp, xchg,
memory_order_acquire, memory_order_relaxed));
#else
} while(!atomic_compare_exchange_weak_explicit(dst, &block, next,
memory_order_acquire, memory_order_relaxed));
pool_block_t* check_block = atomic_load_explicit(&prev->global,
memory_order_relaxed);

if((block != check_block) ||
atomic_exchange_explicit(&block->acquired, true, memory_order_relaxed))
{
atomic_store_explicit(&prev->acquired, false, memory_order_relaxed);
continue;
}

pool_block_t* next = atomic_load_explicit(&block->global,
memory_order_relaxed);
atomic_store_explicit(&prev->global, next, memory_order_relaxed);

// Don't release block.
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(&prev->acquired);
#endif
atomic_store_explicit(&prev->acquired, false, memory_order_release);

break;
}

#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(&in_pool_block_global);
#endif
atomic_fetch_sub_explicit(&in_pool_block_global, 1, memory_order_release);

// We can't modify block until we're sure no other thread will try to read
// from it (e.g. to check if it can be popped from the global list). To do
// this, we wait until nobody is trying to either push or pull.
while(atomic_load_explicit(&in_pool_block_global, memory_order_relaxed) != 0)
ponyint_cpu_relax();

atomic_thread_fence(memory_order_acquire);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(dst);
ANNOTATE_HAPPENS_AFTER(&in_pool_block_global);
#endif

pony_assert(size <= block->size);

#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&block->global);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&block->acquired);
#endif

if(size == block->size)
return block;

Expand Down

0 comments on commit 1eb70dc

Please sign in to comment.