diff --git a/src/common/pony/detail/atomics.h b/src/common/pony/detail/atomics.h index 912890e607..5a0d0889c0 100644 --- a/src/common/pony/detail/atomics.h +++ b/src/common/pony/detail/atomics.h @@ -78,39 +78,41 @@ using std::atomic_flag; namespace ponyint_atomics { template - struct aba_protected_t + struct aba_protected_ptr_t { - static_assert(sizeof(T) <= sizeof(void*), ""); - T object; - uintptr_t counter; + // Nested struct for uniform initialisation with GCC/Clang. + struct + { + T* object; + uintptr_t counter; + }; }; } -# define PONY_ABA_PROTECTED_DECLARE(T) -# define PONY_ABA_PROTECTED(T) ponyint_atomics::aba_protected_t +# define PONY_ABA_PROTECTED_PTR_DECLARE(T) +# define PONY_ABA_PROTECTED_PTR(T) ponyint_atomics::aba_protected_ptr_t #else # if defined(__LP64__) || defined(_WIN64) # define PONY_DOUBLEWORD __int128_t # else # define PONY_DOUBLEWORD int64_t # endif -# define PONY_ABA_PROTECTED_DECLARE(T) \ +# define PONY_ABA_PROTECTED_PTR_DECLARE(T) \ typedef union \ { \ struct \ { \ - _Static_assert(sizeof(T) <= sizeof(void*), ""); \ - T object; \ + T* object; \ uintptr_t counter; \ }; \ PONY_DOUBLEWORD raw; \ - } aba_protected_T; -# define PONY_ABA_PROTECTED(T) aba_protected_T + } aba_protected_##T; +# define PONY_ABA_PROTECTED_PTR(T) aba_protected_##T #endif // Big atomic objects (larger than machine word size) aren't consistently // implemented on the compilers we support. We add our own implementation to // make sure the objects are correctly defined and aligned. -#define PONY_ATOMIC_ABA_PROTECTED(T) alignas(16) PONY_ABA_PROTECTED(T) +#define PONY_ATOMIC_ABA_PROTECTED_PTR(T) alignas(16) PONY_ABA_PROTECTED_PTR(T) #ifdef PONY_WANT_ATOMIC_DEFS # ifdef _MSC_VER @@ -119,17 +121,18 @@ namespace ponyint_atomics namespace ponyint_atomics { template - inline PONY_ABA_PROTECTED(T) big_load(PONY_ABA_PROTECTED(T)* ptr) + inline PONY_ABA_PROTECTED_PTR(T) big_load(PONY_ABA_PROTECTED_PTR(T)* ptr) { - PONY_ABA_PROTECTED(T) ret = {NULL, 0}; + PONY_ABA_PROTECTED_PTR(T) ret = {NULL, 0}; _InterlockedCompareExchange128((LONGLONG*)ptr, 0, 0, (LONGLONG*)&ret); return ret; } template - inline void big_store(PONY_ABA_PROTECTED(T)* ptr, PONY_ABA_PROTECTED(T) val) + inline void big_store(PONY_ABA_PROTECTED_PTR(T)* ptr, + PONY_ABA_PROTECTED_PTR(T) val) { - PONY_ABA_PROTECTED(T) tmp; + PONY_ABA_PROTECTED_PTR(T) tmp; tmp.object = ptr->object; tmp.counter = ptr->counter; while(!_InterlockedCompareExchange128((LONGLONG*)ptr, diff --git a/src/libponyrt/asio/epoll.c b/src/libponyrt/asio/epoll.c index a2abae0d78..8443e79a28 100644 --- a/src/libponyrt/asio/epoll.c +++ b/src/libponyrt/asio/epoll.c @@ -239,6 +239,7 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) close(b->wakeup); ponyint_messageq_destroy(&b->q); POOL_FREE(asio_backend_t, b); + pony_unregister_thread(); return NULL; } diff --git a/src/libponyrt/asio/iocp.c b/src/libponyrt/asio/iocp.c index aaac0e5fe6..f5672da875 100644 --- a/src/libponyrt/asio/iocp.c +++ b/src/libponyrt/asio/iocp.c @@ -229,6 +229,7 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) CloseHandle(b->wakeup); ponyint_messageq_destroy(&b->q); POOL_FREE(asio_backend_t, b); + pony_unregister_thread(); return NULL; } diff --git a/src/libponyrt/asio/kqueue.c b/src/libponyrt/asio/kqueue.c index 19f5659ad4..794ee26f69 100644 --- a/src/libponyrt/asio/kqueue.c +++ b/src/libponyrt/asio/kqueue.c @@ -187,6 +187,7 @@ DECLARE_THREAD_FN(ponyint_asio_backend_dispatch) ponyint_messageq_destroy(&b->q); POOL_FREE(asio_backend_t, b); + pony_unregister_thread(); return NULL; } diff --git a/src/libponyrt/mem/pool.c b/src/libponyrt/mem/pool.c index aa2b5f393b..70ce21cd0f 100644 --- a/src/libponyrt/mem/pool.c +++ b/src/libponyrt/mem/pool.c @@ -54,23 +54,47 @@ typedef struct pool_central_t struct pool_central_t* central; } pool_central_t; +PONY_ABA_PROTECTED_PTR_DECLARE(pool_central_t) + /// A per-size global list of free lists header. typedef struct pool_global_t { size_t size; size_t count; +#ifdef PLATFORM_IS_X86 + PONY_ATOMIC_ABA_PROTECTED_PTR(pool_central_t) central; +#else PONY_ATOMIC(pool_central_t*) central; - PONY_ATOMIC(size_t )ticket; - PONY_ATOMIC(size_t) waiting_for; +#endif } pool_global_t; -/// An item on a thread-local list of free blocks. -typedef struct pool_block_t +typedef struct pool_block_t pool_block_t; + +PONY_ABA_PROTECTED_PTR_DECLARE(pool_block_t) + +/// An item on an either thread-local or global list of free blocks. +struct pool_block_t { - struct pool_block_t* prev; - struct pool_block_t* next; +#ifdef PLATFORM_IS_X86 + union + { + struct + { + pool_block_t* prev; + pool_block_t* next; + }; + PONY_ATOMIC_ABA_PROTECTED_PTR(pool_block_t) global; + }; +#else + pool_block_t* prev; + union + { + pool_block_t* next; + PONY_ATOMIC(pool_block_t*) global; + }; +#endif size_t size; -} pool_block_t; +}; /// A thread local list of free blocks header. typedef struct pool_block_header_t @@ -82,24 +106,30 @@ typedef struct pool_block_header_t static pool_global_t pool_global[POOL_COUNT] = { - {POOL_MIN << 0, POOL_MAX / (POOL_MIN << 0), NULL, 0, 0}, - {POOL_MIN << 1, POOL_MAX / (POOL_MIN << 1), NULL, 0, 0}, - {POOL_MIN << 2, POOL_MAX / (POOL_MIN << 2), NULL, 0, 0}, - {POOL_MIN << 3, POOL_MAX / (POOL_MIN << 3), NULL, 0, 0}, - {POOL_MIN << 4, POOL_MAX / (POOL_MIN << 4), NULL, 0, 0}, - {POOL_MIN << 5, POOL_MAX / (POOL_MIN << 5), NULL, 0, 0}, - {POOL_MIN << 6, POOL_MAX / (POOL_MIN << 6), NULL, 0, 0}, - {POOL_MIN << 7, POOL_MAX / (POOL_MIN << 7), NULL, 0, 0}, - {POOL_MIN << 8, POOL_MAX / (POOL_MIN << 8), NULL, 0, 0}, - {POOL_MIN << 9, POOL_MAX / (POOL_MIN << 9), NULL, 0, 0}, - {POOL_MIN << 10, POOL_MAX / (POOL_MIN << 10), NULL, 0, 0}, - {POOL_MIN << 11, POOL_MAX / (POOL_MIN << 11), NULL, 0, 0}, - {POOL_MIN << 12, POOL_MAX / (POOL_MIN << 12), NULL, 0, 0}, - {POOL_MIN << 13, POOL_MAX / (POOL_MIN << 13), NULL, 0, 0}, - {POOL_MIN << 14, POOL_MAX / (POOL_MIN << 14), NULL, 0, 0}, - {POOL_MIN << 15, POOL_MAX / (POOL_MIN << 15), NULL, 0, 0}, + {POOL_MIN << 0, POOL_MAX / (POOL_MIN << 0), {{NULL, 0}}}, + {POOL_MIN << 1, POOL_MAX / (POOL_MIN << 1), {{NULL, 0}}}, + {POOL_MIN << 2, POOL_MAX / (POOL_MIN << 2), {{NULL, 0}}}, + {POOL_MIN << 3, POOL_MAX / (POOL_MIN << 3), {{NULL, 0}}}, + {POOL_MIN << 4, POOL_MAX / (POOL_MIN << 4), {{NULL, 0}}}, + {POOL_MIN << 5, POOL_MAX / (POOL_MIN << 5), {{NULL, 0}}}, + {POOL_MIN << 6, POOL_MAX / (POOL_MIN << 6), {{NULL, 0}}}, + {POOL_MIN << 7, POOL_MAX / (POOL_MIN << 7), {{NULL, 0}}}, + {POOL_MIN << 8, POOL_MAX / (POOL_MIN << 8), {{NULL, 0}}}, + {POOL_MIN << 9, POOL_MAX / (POOL_MIN << 9), {{NULL, 0}}}, + {POOL_MIN << 10, POOL_MAX / (POOL_MIN << 10), {{NULL, 0}}}, + {POOL_MIN << 11, POOL_MAX / (POOL_MIN << 11), {{NULL, 0}}}, + {POOL_MIN << 12, POOL_MAX / (POOL_MIN << 12), {{NULL, 0}}}, + {POOL_MIN << 13, POOL_MAX / (POOL_MIN << 13), {{NULL, 0}}}, + {POOL_MIN << 14, POOL_MAX / (POOL_MIN << 14), {{NULL, 0}}}, + {POOL_MIN << 15, POOL_MAX / (POOL_MIN << 15), {{NULL, 0}}}, }; +#ifdef PLATFORM_IS_X86 +static PONY_ATOMIC_ABA_PROTECTED_PTR(pool_block_t) pool_block_global; +#else +static PONY_ATOMIC(pool_block_t*) pool_block_global; +#endif + static __pony_thread_local pool_local_t pool_local[POOL_COUNT]; static __pony_thread_local pool_block_header_t pool_block_header; @@ -384,7 +414,211 @@ static void pool_block_insert(pool_block_t* block) next->prev = block; } -static void* pool_alloc_pages(size_t size) +static void pool_block_push(pool_block_t* block) +{ +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_block_t)* dst = NULL; + PONY_ABA_PROTECTED_PTR(pool_block_t) cmp; + PONY_ABA_PROTECTED_PTR(pool_block_t) xchg; + cmp.object = pool_block_global.object; + cmp.counter = pool_block_global.counter; +#else + PONY_ATOMIC(pool_block_t*)* dst; + pool_block_t* next = NULL; +#endif + pool_block_t* pos; + + do + { +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_block_t) cmp_prev = {{NULL, 0}}; + + // If dst is &pool_block_global, we did a CAS iteration and failed to get + // pool_block_global. cmp already holds the right value, don't reload in + // that case. + if(dst != &pool_block_global) + { + cmp = bigatomic_load_explicit(&pool_block_global, memory_order_acquire); +# ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(&pool_block_global); +# endif + } + + pos = cmp.object; +#else + pos = atomic_load_explicit(&pool_block_global, memory_order_acquire); +# ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(&pool_block_global); +# endif +#endif + + // Find an insertion position. The list is sorted and stays sorted after an + // insertion. + pool_block_t* prev = NULL; + while((pos != NULL) && (block->size > pos->size)) + { +#ifdef PLATFORM_IS_X86 + cmp_prev = cmp; + cmp = bigatomic_load_explicit(&pos->global, memory_order_acquire); + prev = pos; + pos = cmp.object; +#else + prev = pos; + pos = atomic_load_explicit(&pos->global, memory_order_acquire); +#endif +#ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(&pos->global); +#endif + } + + if(prev == NULL) + { + // Insert at the beginning. +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_block_t)* ptr = &block->global; + ptr->object = NULL; + ptr->counter = cmp_prev.counter + 1; +#else + atomic_store_explicit(&block->global, NULL, memory_order_relaxed); +#endif + dst = &pool_block_global; + } else if(pos == NULL) { + // Insert at the end. +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_block_t)* ptr = &block->global; + ptr->object = NULL; + ptr->counter = cmp.counter + 1; +#else + atomic_store_explicit(&block->global, NULL, memory_order_relaxed); +#endif + dst = &prev->global; + } else { +#ifdef PLATFORM_IS_X86 + cmp = bigatomic_load_explicit(&pos->global, memory_order_relaxed); + PONY_ABA_PROTECTED_PTR(pool_block_t)* ptr = &block->global; + ptr->object = cmp.object; + ptr->counter = cmp.counter + 1; +#else + next = atomic_load_explicit(&pos->global, memory_order_relaxed); + atomic_store_explicit(&block->global, next, memory_order_relaxed); +#endif + dst = &pos->global; + } + + // If the compare_exchange fails, somebody else got the node and we have to + // look for another position to insert the block. + +#ifdef USE_VALGRIND + ANNOTATE_HAPPENS_BEFORE(dst); +#endif +#ifdef PLATFORM_IS_X86 + xchg.object = block; + xchg.counter = cmp.counter + 1; + } while(!bigatomic_compare_exchange_weak_explicit(dst, &cmp, xchg, + memory_order_release, memory_order_relaxed)); +#else + } while(!atomic_compare_exchange_weak_explicit(dst, &next, block, + memory_order_release, memory_order_relaxed)); +#endif +} + +static void* pool_block_pull(size_t size) +{ + pool_block_t* block; +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_block_t)* dst = NULL; + PONY_ABA_PROTECTED_PTR(pool_block_t) cmp; + PONY_ABA_PROTECTED_PTR(pool_block_t) xchg; + cmp.object = pool_block_global.object; + + // Fast bailout path without additional branching. + if(cmp.object == NULL) + return NULL; + + cmp.counter = pool_block_global.counter; +#else + PONY_ATOMIC(pool_block_t*)* dst; + pool_block_t* next; +#endif + pool_block_t* prev = NULL; + + do + { +#ifdef PLATFORM_IS_X86 + // If dst is &pool_block_global, we did a CAS iteration and failed to get + // pool_block_global. cmp already holds the right value, don't reload in + // that case. + if(dst != &pool_block_global) + cmp = bigatomic_load_explicit(&pool_block_global, memory_order_relaxed); + + block = cmp.object; +#else + block = atomic_load_explicit(&pool_block_global, memory_order_relaxed); +#endif + if(block == NULL) + return NULL; + + atomic_thread_fence(memory_order_acquire); +#ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(&pool_block_global); +#endif + + // Find a big enough block. The list is sorted. + while((block != NULL) && (size > block->size)) + { +#ifdef PLATFORM_IS_X86 + prev = block; + cmp = bigatomic_load_explicit(&block->global, memory_order_acquire); + block = cmp.object; +#else + prev = block; + block = atomic_load_explicit(&block->global, memory_order_acquire); +#endif +#ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(&block->global); +#endif + } + + // No suitable block. + if(block == NULL) + return NULL; + +#ifdef PLATFORM_IS_X86 + xchg = bigatomic_load_explicit(&block->global, memory_order_relaxed); + xchg.counter = cmp.counter + 1; +#else + next = atomic_load_explicit(&block->global, memory_order_relaxed); +#endif + dst = (prev == NULL) ? &pool_block_global : &prev->global; + + // If the compare_exchange fails, somebody else got the node and we have to + // look for another block. +#ifdef PLATFORM_IS_X86 + } while(!bigatomic_compare_exchange_weak_explicit(dst, &cmp, xchg, + memory_order_acquire, memory_order_relaxed)); +#else + } while(!atomic_compare_exchange_weak_explicit(dst, &block, next, + memory_order_acquire, memory_order_relaxed)); +#endif + +#ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(dst); +#endif + + pony_assert(size <= block->size); + + if(size == block->size) + return block; + + size_t rem = block->size - size; + block->size = rem; + pool_block_header.total_size += rem; + pool_block_insert(block); + + return (char*)block + rem; +} + +static void* pool_block_get(size_t size) { if(pool_block_header.total_size >= size) { @@ -426,6 +660,16 @@ static void* pool_alloc_pages(size_t size) } } + return pool_block_pull(size); +} + +static void* pool_alloc_pages(size_t size) +{ + void* p = pool_block_get(size); + + if(p != NULL) + return p; + // We have no free blocks big enough. if(size >= POOL_MMAP) return ponyint_virt_alloc(size); @@ -466,74 +710,86 @@ static void pool_push(pool_local_t* thread, pool_global_t* global) thread->pool = NULL; thread->length = 0; - pony_assert(p->length == global->count); + pony_assert((p->length > 0) && (p->length <= global->count)); TRACK_PUSH((pool_item_t*)p, p->length, global->size); - size_t my_ticket = atomic_fetch_add_explicit(&global->ticket, 1, - memory_order_relaxed); - - while(my_ticket != atomic_load_explicit(&global->waiting_for, - memory_order_relaxed)) - ponyint_cpu_relax(); + pool_central_t* top; +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_central_t) cmp; + PONY_ABA_PROTECTED_PTR(pool_central_t) xchg; + cmp.object = global->central.object; + cmp.counter = global->central.counter; - atomic_thread_fence(memory_order_acquire); -#ifdef USE_VALGRIND - ANNOTATE_HAPPENS_AFTER(&global->waiting_for); + xchg.object = p; +#else + top = atomic_load_explicit(&global->central, memory_order_relaxed); #endif - pool_central_t* top = atomic_load_explicit(&global->central, - memory_order_relaxed); - p->central = top; + do + { +#ifdef PLATFORM_IS_X86 + top = cmp.object; + xchg.counter = cmp.counter + 1; +#endif + p->central = top; - atomic_store_explicit(&global->central, p, memory_order_relaxed); #ifdef USE_VALGRIND - ANNOTATE_HAPPENS_BEFORE(&global->waiting_for); + ANNOTATE_HAPPENS_BEFORE(&global->central); +#endif +#ifdef PLATFORM_IS_X86 + } while(!bigatomic_compare_exchange_weak_explicit(&global->central, &cmp, + xchg, memory_order_release, memory_order_relaxed)); +#else + } while(!atomic_compare_exchange_weak_explicit(&global->central, &top, p, + memory_order_release, memory_order_relaxed); #endif - atomic_store_explicit(&global->waiting_for, my_ticket + 1, - memory_order_release); } static pool_item_t* pool_pull(pool_local_t* thread, pool_global_t* global) { - // If we believe the global free list is empty, bailout immediately without - // taking a ticket to avoid unnecessary contention. - if(atomic_load_explicit(&global->central, memory_order_relaxed) == NULL) - return NULL; - - size_t my_ticket = atomic_fetch_add_explicit(&global->ticket, 1, - memory_order_relaxed); - - while(my_ticket != atomic_load_explicit(&global->waiting_for, - memory_order_relaxed)) - ponyint_cpu_relax(); - - atomic_thread_fence(memory_order_acquire); -#ifdef USE_VALGRIND - ANNOTATE_HAPPENS_AFTER(&global->waiting_for); + pool_central_t* top; +#ifdef PLATFORM_IS_X86 + PONY_ABA_PROTECTED_PTR(pool_central_t) cmp; + PONY_ABA_PROTECTED_PTR(pool_central_t) xchg; + cmp.object = global->central.object; + cmp.counter = global->central.counter; +#else + top = atomic_load_explicit(&global->central, memory_order_relaxed); #endif + pool_central_t* next; - pool_central_t* top = atomic_load_explicit(&global->central, - memory_order_relaxed); - - if(top == NULL) + do { - atomic_store_explicit(&global->waiting_for, my_ticket + 1, - memory_order_relaxed); - return NULL; - } +#ifdef PLATFORM_IS_X86 + top = cmp.object; +#endif + if(top == NULL) + return NULL; - pool_central_t* next = top->central; + atomic_thread_fence(memory_order_acquire); +#ifdef USE_VALGRIND + ANNOTATE_HAPPENS_AFTER(&global->central); +#endif + next = top->central; +#ifdef PLATFORM_IS_X86 + xchg.object = next; + xchg.counter = cmp.counter + 1; + } while(!bigatomic_compare_exchange_weak_explicit(&global->central, &cmp, + xchg, memory_order_acquire, memory_order_relaxed)); +#else + } while(!atomic_compare_exchange_weak_explicit(&global->central, &top, next, + memory_order_acquire, memory_order_relaxed)); +#endif - atomic_store_explicit(&global->central, next, memory_order_relaxed); + // We need to synchronise twice on global->central to make sure we see every + // previous write to the memory we're going to use before we use it. #ifdef USE_VALGRIND - ANNOTATE_HAPPENS_BEFORE(&global->waiting_for); + ANNOTATE_HAPPENS_AFTER(&global->central); #endif - atomic_store_explicit(&global->waiting_for, my_ticket + 1, - memory_order_release); pool_item_t* p = (pool_item_t*)top; - pony_assert(top->length == global->count); + pony_assert((top->length > 0) && (top->length <= global->count)); TRACK_PULL(p, top->length, global->size); thread->pool = p->next; @@ -683,6 +939,45 @@ void ponyint_pool_free_size(size_t size, void* p) #endif } +void ponyint_pool_thread_cleanup() +{ + for(size_t index = 0; index < POOL_COUNT; index++) + { + pool_local_t* thread = &pool_local[index]; + if(thread->pool != NULL) + { + pool_global_t* global = &pool_global[index]; + pool_push(thread, global); + } + + pony_assert(thread->end >= thread->start); + size_t block_length = thread->end - thread->start; + if(block_length >= sizeof(pool_block_t)) + { + pool_block_t* block = (pool_block_t*)thread->start; + thread->start = NULL; + thread->end = NULL; + + block->prev = NULL; + block->next = NULL; + block->size = block_length; + pool_block_push(block); + } + } + + pool_block_t* block = pool_block_header.head; + while(block != NULL) + { + pool_block_t* next = block->next; + pool_block_remove(block); + pool_block_push(block); + block = next; + } + + pool_block_header.total_size = 0; + pool_block_header.largest_size = 0; +} + size_t ponyint_pool_index(size_t size) { if(size <= POOL_MIN) diff --git a/src/libponyrt/mem/pool.h b/src/libponyrt/mem/pool.h index 06741f2c1a..08fd675fe3 100644 --- a/src/libponyrt/mem/pool.h +++ b/src/libponyrt/mem/pool.h @@ -27,6 +27,8 @@ void ponyint_pool_free(size_t index, void* p); __pony_spec_malloc__(void* ponyint_pool_alloc_size(size_t size)); void ponyint_pool_free_size(size_t size, void* p); +void ponyint_pool_thread_cleanup(); + size_t ponyint_pool_index(size_t size); size_t ponyint_pool_size(size_t index); diff --git a/src/libponyrt/pony.h b/src/libponyrt/pony.h index 83b04740b0..0ae47a3189 100644 --- a/src/libponyrt/pony.h +++ b/src/libponyrt/pony.h @@ -366,6 +366,14 @@ PONY_API int pony_start(bool library, bool language_features); */ PONY_API void pony_register_thread(); +/** Unregisters a non-scheduler thread. + * + * Clean up the runtime context allocated when registering a thread with + * pony_register_thread(). This should never be called from a thread owned by + * the Pony runtime. + */ +PONY_API void pony_unregister_thread(); + /** Signals that the pony runtime may terminate. * * This only needs to be called if pony_start() was called with library set to diff --git a/src/libponyrt/sched/mpmcq.c b/src/libponyrt/sched/mpmcq.c index 66fef64f2a..6706156e3b 100644 --- a/src/libponyrt/sched/mpmcq.c +++ b/src/libponyrt/sched/mpmcq.c @@ -40,10 +40,8 @@ void ponyint_mpmcq_init(mpmcq_t* q) atomic_store_explicit(&q->head, node, memory_order_relaxed); #ifdef PLATFORM_IS_X86 - PONY_ABA_PROTECTED(mpmcq_node_t*) tail; - tail.object = node; - tail.counter = 0; - bigatomic_store_explicit(&q->tail, tail, memory_order_relaxed); + q->tail.object = node; + q->tail.counter = 0; #else atomic_store_explicit(&q->tail, node, memory_order_relaxed); #endif @@ -53,11 +51,8 @@ void ponyint_mpmcq_destroy(mpmcq_t* q) { atomic_store_explicit(&q->head, NULL, memory_order_relaxed); #ifdef PLATFORM_IS_X86 - PONY_ABA_PROTECTED(mpmcq_node_t*) tail = bigatomic_load_explicit(&q->tail, - memory_order_relaxed); - node_free(tail.object); - tail.object = NULL; - bigatomic_store_explicit(&q->tail, tail, memory_order_relaxed); + node_free(q->tail.object); + q->tail.object = NULL; #else mpmcq_node_t* tail = atomic_load_explicit(&q->tail, memory_order_relaxed); node_free(tail); @@ -93,8 +88,8 @@ void ponyint_mpmcq_push_single(mpmcq_t* q, void* data) void* ponyint_mpmcq_pop(mpmcq_t* q) { #ifdef PLATFORM_IS_X86 - PONY_ABA_PROTECTED(mpmcq_node_t*) cmp; - PONY_ABA_PROTECTED(mpmcq_node_t*) xchg; + PONY_ABA_PROTECTED_PTR(mpmcq_node_t) cmp; + PONY_ABA_PROTECTED_PTR(mpmcq_node_t) xchg; mpmcq_node_t* tail; // Load the tail non-atomically. If object and counter are out of sync, we'll // do an additional CAS iteration which isn't less efficient than doing an diff --git a/src/libponyrt/sched/mpmcq.h b/src/libponyrt/sched/mpmcq.h index 551bd8655f..afd1bea5cd 100644 --- a/src/libponyrt/sched/mpmcq.h +++ b/src/libponyrt/sched/mpmcq.h @@ -12,13 +12,13 @@ PONY_EXTERN_C_BEGIN typedef struct mpmcq_node_t mpmcq_node_t; -PONY_ABA_PROTECTED_DECLARE(mpmcq_node_t*) +PONY_ABA_PROTECTED_PTR_DECLARE(mpmcq_node_t) typedef struct mpmcq_t { alignas(64) PONY_ATOMIC(mpmcq_node_t*) head; #ifdef PLATFORM_IS_X86 - PONY_ATOMIC_ABA_PROTECTED(mpmcq_node_t*) tail; + PONY_ATOMIC_ABA_PROTECTED_PTR(mpmcq_node_t) tail; #else // On ARM, the ABA problem is dealt with by the hardware with // LoadLinked/StoreConditional instructions. diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index fb8e5f57e1..572bbc4eb4 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -336,6 +336,7 @@ static DECLARE_THREAD_FN(run_thread) this_scheduler = sched; ponyint_cpu_affinity(sched->cpu); run(sched); + ponyint_pool_thread_cleanup(); return 0; } @@ -453,7 +454,7 @@ uint32_t ponyint_sched_cores() return scheduler_count; } -void pony_register_thread() +PONY_API void pony_register_thread() { if(this_scheduler != NULL) return; @@ -464,6 +465,17 @@ void pony_register_thread() this_scheduler->tid = ponyint_thread_self(); } +PONY_API void pony_unregister_thread() +{ + if(this_scheduler == NULL) + return; + + POOL_FREE(scheduler_t, this_scheduler); + this_scheduler = NULL; + + ponyint_pool_thread_cleanup(); +} + PONY_API pony_ctx_t* pony_ctx() { return &this_scheduler->ctx;