Skip to content

Commit

Permalink
Added Thread\Barrier (#5380)
Browse files Browse the repository at this point in the history
* refactor thread barrier

* Added Swoole\Thread\Barrier

* remove test code

* Add protection code
  • Loading branch information
matyhtf authored Jun 24, 2024
1 parent 565b560 commit ce934b8
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 53 deletions.
1 change: 1 addition & 0 deletions ext-src/php_swoole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ PHP_MINIT_FUNCTION(swoole) {
php_swoole_thread_minit(module_number);
php_swoole_thread_atomic_minit(module_number);
php_swoole_thread_lock_minit(module_number);
php_swoole_thread_barrier_minit(module_number);
php_swoole_thread_queue_minit(module_number);
php_swoole_thread_map_minit(module_number);
php_swoole_thread_arraylist_minit(module_number);
Expand Down
1 change: 1 addition & 0 deletions ext-src/php_swoole_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ void php_swoole_name_resolver_minit(int module_number);
void php_swoole_thread_minit(int module_number);
void php_swoole_thread_atomic_minit(int module_number);
void php_swoole_thread_lock_minit(int module_number);
void php_swoole_thread_barrier_minit(int module_number);
void php_swoole_thread_queue_minit(int module_number);
void php_swoole_thread_map_minit(int module_number);
void php_swoole_thread_arraylist_minit(int module_number);
Expand Down
8 changes: 8 additions & 0 deletions ext-src/stubs/php_swoole_thread_barrier.stub.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php
namespace Swoole\Thread {
class Barrier {
public function __construct(int $count) {}
public function wait(): void {}
public function __wakeup(): void {}
}
}
11 changes: 11 additions & 0 deletions ext-src/stubs/php_swoole_thread_barrier_arginfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: eac62993bfb3fbd87587a8e6997c16bca7dc5dbc */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Swoole_Thread_Barrier___construct, 0, 0, 1)
ZEND_ARG_TYPE_INFO(0, count, IS_LONG, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Swoole_Thread_Barrier_wait, 0, 0, IS_VOID, 0)
ZEND_END_ARG_INFO()

#define arginfo_class_Swoole_Thread_Barrier___wakeup arginfo_class_Swoole_Thread_Barrier_wait
4 changes: 0 additions & 4 deletions ext-src/swoole_thread_atomic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,15 @@ static PHP_METHOD(swoole_thread_atomic, set);
static PHP_METHOD(swoole_thread_atomic, cmpset);
static PHP_METHOD(swoole_thread_atomic, wait);
static PHP_METHOD(swoole_thread_atomic, wakeup);
#ifdef SW_THREAD
static PHP_METHOD(swoole_thread_atomic, __wakeup);
#endif

static PHP_METHOD(swoole_thread_atomic_long, __construct);
static PHP_METHOD(swoole_thread_atomic_long, add);
static PHP_METHOD(swoole_thread_atomic_long, sub);
static PHP_METHOD(swoole_thread_atomic_long, get);
static PHP_METHOD(swoole_thread_atomic_long, set);
static PHP_METHOD(swoole_thread_atomic_long, cmpset);
#ifdef SW_THREAD
static PHP_METHOD(swoole_thread_atomic_long, __wakeup);
#endif
SW_EXTERN_C_END

// clang-format off
Expand Down
153 changes: 153 additions & 0 deletions ext-src/swoole_thread_barrier.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| license@swoole.com so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Tianfeng Han <rango@swoole.com> |
+----------------------------------------------------------------------+
*/

#include "php_swoole_private.h"
#include "php_swoole_thread.h"
#include "swoole_memory.h"
#include "swoole_lock.h"

#ifdef SW_THREAD

BEGIN_EXTERN_C()
#include "stubs/php_swoole_thread_barrier_arginfo.h"
END_EXTERN_C()

using swoole::Barrier;

static zend_class_entry *swoole_thread_barrier_ce;
static zend_object_handlers swoole_thread_barrier_handlers;

struct BarrierResource : public ThreadResource {
Barrier barrier_;
BarrierResource(int count) : ThreadResource() {
barrier_.init(false, count);
}
void wait() {
barrier_.wait();
}
~BarrierResource() {
barrier_.destroy();
}
};

struct BarrierObject {
BarrierResource *barrier;
zend_object std;
};

static sw_inline BarrierObject *php_swoole_thread_barrier_fetch_object(zend_object *obj) {
return (BarrierObject *) ((char *) obj - swoole_thread_barrier_handlers.offset);
}

static BarrierResource *php_swoole_thread_barrier_get_ptr(zval *zobject) {
return php_swoole_thread_barrier_fetch_object(Z_OBJ_P(zobject))->barrier;
}

static BarrierResource *php_swoole_thread_barrier_get_and_check_ptr(zval *zobject) {
BarrierResource *barrier = php_swoole_thread_barrier_get_ptr(zobject);
if (!barrier) {
php_swoole_fatal_error(E_ERROR, "must call constructor first");
}
return barrier;
}

static void php_swoole_thread_barrier_free_object(zend_object *object) {
BarrierObject *bo = php_swoole_thread_barrier_fetch_object(object);
zend_long resource_id = zend::object_get_long(object, ZEND_STRL("id"));
if (bo->barrier && php_swoole_thread_resource_free(resource_id, bo->barrier)) {
delete bo->barrier;
bo->barrier = nullptr;
}
zend_object_std_dtor(object);
}

static zend_object *php_swoole_thread_barrier_create_object(zend_class_entry *ce) {
BarrierObject *bo = (BarrierObject *) zend_object_alloc(sizeof(BarrierObject), ce);
zend_object_std_init(&bo->std, ce);
object_properties_init(&bo->std, ce);
bo->std.handlers = &swoole_thread_barrier_handlers;
return &bo->std;
}

SW_EXTERN_C_BEGIN
static PHP_METHOD(swoole_thread_barrier, __construct);
static PHP_METHOD(swoole_thread_barrier, wait);
static PHP_METHOD(swoole_thread_barrier, __wakeup);
SW_EXTERN_C_END

// clang-format off
static const zend_function_entry swoole_thread_barrier_methods[] =
{
PHP_ME(swoole_thread_barrier, __construct, arginfo_class_Swoole_Thread_Barrier___construct, ZEND_ACC_PUBLIC)
PHP_ME(swoole_thread_barrier, wait, arginfo_class_Swoole_Thread_Barrier_wait, ZEND_ACC_PUBLIC)
PHP_ME(swoole_thread_barrier, __wakeup, arginfo_class_Swoole_Thread_Barrier___wakeup, ZEND_ACC_PUBLIC)
PHP_FE_END
};
// clang-format on

void php_swoole_thread_barrier_minit(int module_number) {
SW_INIT_CLASS_ENTRY(swoole_thread_barrier, "Swoole\\Thread\\Barrier", nullptr, swoole_thread_barrier_methods);
zend_declare_property_long(swoole_thread_barrier_ce, ZEND_STRL("id"), 0, ZEND_ACC_PUBLIC);
SW_SET_CLASS_CLONEABLE(swoole_thread_barrier, sw_zend_class_clone_deny);
SW_SET_CLASS_UNSET_PROPERTY_HANDLER(swoole_thread_barrier, sw_zend_class_unset_property_deny);
SW_SET_CLASS_CUSTOM_OBJECT(swoole_thread_barrier,
php_swoole_thread_barrier_create_object,
php_swoole_thread_barrier_free_object,
BarrierObject,
std);
}

static PHP_METHOD(swoole_thread_barrier, __construct) {
auto bo = php_swoole_thread_barrier_fetch_object(Z_OBJ_P(ZEND_THIS));
if (bo->barrier != nullptr) {
zend_throw_error(NULL, "Constructor of %s can only be called once", SW_Z_OBJCE_NAME_VAL_P(ZEND_THIS));
RETURN_FALSE;
}

zend_long count;
ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_LONG(count)
ZEND_PARSE_PARAMETERS_END();

if (count < 2) {
zend_throw_exception(
swoole_exception_ce, "The parameter $count must be greater than 1", SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}

bo->barrier = new BarrierResource(count);
auto resource_id = php_swoole_thread_resource_insert(bo->barrier);
zend_update_property_long(swoole_thread_barrier_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("id"), resource_id);
RETURN_TRUE;
}

static PHP_METHOD(swoole_thread_barrier, wait) {
BarrierResource *barrier = php_swoole_thread_barrier_get_and_check_ptr(ZEND_THIS);
if (barrier) {
barrier->wait();
}
}

static PHP_METHOD(swoole_thread_barrier, __wakeup) {
auto bo = php_swoole_thread_barrier_fetch_object(Z_OBJ_P(ZEND_THIS));
zend_long resource_id = zend::object_get_long(ZEND_THIS, ZEND_STRL("id"));
bo->barrier = static_cast<BarrierResource *>(php_swoole_thread_resource_fetch(resource_id));
if (!bo->barrier) {
zend_throw_exception(swoole_exception_ce, EMSG_NO_RESOURCE, ECODE_NO_RESOURCE);
return;
}
}
#endif
4 changes: 0 additions & 4 deletions ext-src/swoole_thread_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ using swoole::RWLock;
static zend_class_entry *swoole_thread_lock_ce;
static zend_object_handlers swoole_thread_lock_handlers;

#ifdef SW_THREAD
struct LockResource : public ThreadResource {
Lock *lock_;
LockResource(int type) : ThreadResource() {
Expand All @@ -62,7 +61,6 @@ struct LockResource : public ThreadResource {
delete lock_;
}
};
#endif

struct LockObject {
LockResource *lock;
Expand Down Expand Up @@ -113,9 +111,7 @@ static PHP_METHOD(swoole_thread_lock, lock_read);
static PHP_METHOD(swoole_thread_lock, trylock_read);
static PHP_METHOD(swoole_thread_lock, unlock);
static PHP_METHOD(swoole_thread_lock, destroy);
#ifdef SW_THREAD
static PHP_METHOD(swoole_thread_lock, __wakeup);
#endif
SW_EXTERN_C_END

// clang-format off
Expand Down
2 changes: 1 addition & 1 deletion include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ typedef unsigned long ulong_t;
#define SW_ASSERT(e)
#define SW_ASSERT_1BYTE(v)
#endif
#define SW_START_SLEEP usleep(100000) // sleep 1s,wait fork and pthread_create
#define SW_START_SLEEP usleep(100000) // sleep 0.1s, wait fork and pthread_create

#ifdef SW_THREAD
#define SW_THREAD_LOCAL thread_local
Expand Down
19 changes: 19 additions & 0 deletions include/swoole_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,23 @@ class SpinLock : public Lock {
int trylock() override;
};
#endif

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
#define SW_USE_PTHREAD_BARRIER
#endif

struct Barrier {
#ifdef SW_USE_PTHREAD_BARRIER
pthread_barrier_t barrier_;
pthread_barrierattr_t barrier_attr_;
bool shared_;
#else
sw_atomic_t count_;
sw_atomic_t barrier_;
#endif
void init(bool shared, int count);
void wait();
void destroy();
};

} // namespace swoole
9 changes: 2 additions & 7 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,7 @@ struct ServerGS {

sw_atomic_t spinlock;

#ifdef HAVE_PTHREAD_BARRIER
pthread_barrier_t manager_barrier;
pthread_barrierattr_t manager_barrier_attr;
#endif
Barrier manager_barrier;

ProcessPool task_workers;
ProcessPool event_workers;
Expand Down Expand Up @@ -858,9 +855,7 @@ class Server {
std::shared_ptr<std::vector<std::string>> http_index_files = nullptr;
std::shared_ptr<std::unordered_set<std::string>> http_compression_types = nullptr;

#ifdef HAVE_PTHREAD_BARRIER
pthread_barrier_t reactor_thread_barrier = {};
#endif
Barrier reactor_thread_barrier = {};

/**
* temporary directory for HTTP uploaded file.
Expand Down
63 changes: 63 additions & 0 deletions src/lock/barrier.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| license@swoole.com so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Tianfeng Han <rango@swoole.com> |
+----------------------------------------------------------------------+
*/

#include "swoole.h"
#include "swoole_lock.h"

namespace swoole {

#define BARRIER_USEC 10000

void Barrier::init(bool shared, int count) {
#ifdef SW_USE_PTHREAD_BARRIER
if (shared) {
pthread_barrierattr_setpshared(&barrier_attr_, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&barrier_, &barrier_attr_, count);
} else {
pthread_barrier_init(&barrier_, nullptr, count);
}
shared_ = shared;
#else
barrier_ = 0;
count_ = count;
#endif
}

void Barrier::wait() {
#ifdef SW_USE_PTHREAD_BARRIER
pthread_barrier_wait(&barrier_);
#else
sw_atomic_add_fetch(&barrier_, 1);
SW_LOOP {
if (barrier_ == count_) {
break;
}
usleep(BARRIER_USEC);
sw_atomic_memory_barrier();
}
#endif
}

void Barrier::destroy() {
#ifdef SW_USE_PTHREAD_BARRIER
pthread_barrier_destroy(&barrier_);
if (shared_) {
pthread_barrierattr_destroy(&barrier_attr_);
}
#endif
}

}; // namespace swoole
7 changes: 1 addition & 6 deletions src/server/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,7 @@ void Manager::wait(Server *_server) {
int sigid = SIGTERM;
procctl(P_PID, 0, PROC_PDEATHSIG_CTL, &sigid);
#endif

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
pthread_barrier_wait(&_server->gs->manager_barrier);
#else
SW_START_SLEEP;
#endif
_server->gs->manager_barrier.wait();
}

if (_server->isset_hook(Server::HOOK_MANAGER_START)) {
Expand Down
Loading

0 comments on commit ce934b8

Please sign in to comment.