Skip to content

Commit

Permalink
Optimize thread co socket (#5370)
Browse files Browse the repository at this point in the history
* Optimize co socket resource security

* Optimize code

* Optimize code

* fix tests

* remove thread::exec()
  • Loading branch information
matyhtf authored Jun 11, 2024
1 parent 990d4aa commit 112f552
Show file tree
Hide file tree
Showing 31 changed files with 204 additions and 119 deletions.
2 changes: 1 addition & 1 deletion examples/thread/aio.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
$threads = [];
$atomic = new Swoole\Thread\Atomic();
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $atomic);
$threads[] = new Thread(__FILE__, $i, $atomic);
}
for ($i = 0; $i < $c; $i++) {
$threads[$i]->join();
Expand Down
7 changes: 5 additions & 2 deletions examples/thread/argv.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

if (empty($args)) {
var_dump($GLOBALS['argv']);
$thread = Thread::exec(__FILE__, 'thread-1', $argc, $argv);
$thread->join();
$n = 2;
while ($n--) {
$thread = new Thread(__FILE__, 'thread-' . $n, $argc, $argv);
$thread->join();
}
} else {
var_dump($args[0], $args[1], $args[2]);
sleep(1);
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/atomic.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
$a1 = new Atomic;
$a2 = new Long;
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $a1, $a2);
$threads[] = new Thread(__FILE__, $i, $a1, $a2);
}
for ($i = 0; $i < $c; $i++) {
$threads[$i]->join();
Expand Down
4 changes: 2 additions & 2 deletions examples/thread/co.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
$list[1] = uniqid();
var_dump(count($list));

$t1 = Swoole\Thread::exec('mt.php', 'thread-1', PHP_OS, $map, $list);
$t2 = Swoole\Thread::exec('mt.php', 'thread-2', PHP_OS, $map, $list);
$t1 = new Swoole\Thread('mt.php', 'thread-1', PHP_OS, $map, $list);
$t2 = new Swoole\Thread('mt.php', 'thread-2', PHP_OS, $map, $list);

//var_dump($t1->id);
//var_dump($t2->id);
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/lock.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
if (empty($args)) {
$lock = new Lock;
$lock->lock();
$thread = Thread::exec(__FILE__, $lock);
$thread = new Thread(__FILE__, $lock);
$lock->lock();
echo "main thread\n";
$thread->join();
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/mt.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
var_dump(count($list));

//if ($args[0] == 'thread-2') {
// $t3 = Swoole\Thread::exec('mt.php', 'thread-3', PHP_OS);
// $t3 = new Swoole\Thread('mt.php', 'thread-3', PHP_OS);
// $t3->join();
//}

Expand Down
2 changes: 1 addition & 1 deletion examples/thread/pipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if (empty($args)) {
Co\run(function () {
$sockets = swoole_coroutine_socketpair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$thread = Thread::exec(__FILE__, $sockets);
$thread = new Thread(__FILE__, $sockets);
echo $sockets[0]->recv(8192), PHP_EOL;
$thread->join();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/run_test.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
$threads = [];

for ($i = 0; $i < $c; $i++) {
$threads[] = Swoole\Thread::exec('benchmark.php', 'thread-' . ($i + 1), $map);
$threads[] = new Swoole\Thread('benchmark.php', 'thread-' . ($i + 1), $map);
}

for ($i = 0; $i < $c; $i++) {
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$threads = [];
$queue = new Queue;
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $queue);
$threads[] = new Thread(__FILE__, $i, $queue);
}
for ($i = 0; $i < $c; $i++) {
$threads[$i]->join();
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/signal.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Co\run(function () {
echo "main thread\n";
$sockets = swoole_coroutine_socketpair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$thread = Thread::exec(__FILE__, $sockets);
$thread = new Thread(__FILE__, $sockets);
$parent_pipe = $sockets[1];
// 收到信号之后向子线程发送指令让子线程退出
if (System::waitSignal(SIGTERM)) {
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/thread_pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$threads = [];
$queue = new Queue;
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $queue);
$threads[] = new Thread(__FILE__, $i, $queue);
}
while ($n--) {
$queue->push(base64_encode(random_bytes(16)), Queue::NOTIFY_ONE);
Expand Down
6 changes: 4 additions & 2 deletions ext-src/php_swoole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ zend_class_entry *swoole_error_ce;
zend_object_handlers swoole_error_handlers;

#ifdef COMPILE_DL_SWOOLE
#ifdef ZTS
ZEND_TSRMLS_CACHE_DEFINE()
#endif
ZEND_GET_MODULE(swoole)
#endif

Expand Down Expand Up @@ -1089,8 +1092,7 @@ PHP_RSHUTDOWN_FUNCTION(swoole) {
if (!zstream) {
return;
}
stream =
(php_stream *) zend_fetch_resource2_ex((zstream), NULL, php_file_le_stream(), php_file_le_pstream());
stream = (php_stream *) zend_fetch_resource2_ex((zstream), NULL, php_file_le_stream(), php_file_le_pstream());
if (!stream) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions ext-src/php_swoole_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ extern zend_string **sw_zend_known_strings;
SW_API bool php_swoole_is_enable_coroutine();
SW_API zend_object *php_swoole_create_socket(enum swSocketType type);
SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType type);
SW_API zend_object *php_swoole_create_socket_from_fd(int fd, int _domain, int _type, int _protocol);
SW_API bool php_swoole_export_socket(zval *zobject, swoole::coroutine::Socket *_socket);
SW_API zend_object *php_swoole_dup_socket(int fd, enum swSocketType type);
SW_API void php_swoole_init_socket_object(zval *zobject, swoole::coroutine::Socket *socket);
Expand Down
18 changes: 14 additions & 4 deletions ext-src/php_swoole_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@ bool php_swoole_thread_resource_free(ThreadResourceId resource_id, ThreadResourc
ThreadResource *php_swoole_thread_resource_fetch(ThreadResourceId resource_id);

void php_swoole_thread_start(zend_string *file, zend_string *argv);
zend_string *php_swoole_thread_serialize(zval *zdata);
bool php_swoole_thread_unserialize(zend_string *data, zval *zv);
zend_string *php_swoole_thread_argv_serialize(zval *zdata);
bool php_swoole_thread_argv_unserialize(zend_string *data, zval *zv);
zend_string *php_swoole_serialize(zval *zdata);
bool php_swoole_unserialize(zend_string *data, zval *zv);
void php_swoole_thread_argv_clean(zval *zdata);
void php_swoole_thread_bailout(void);

zval *php_swoole_thread_get_arguments();

#define EMSG_NO_RESOURCE "resource not found"
#define ECODE_NO_RESOURCE -2

#define IS_STREAM_SOCKET 98
#define IS_SERIALIZED_OBJECT 99
enum {
IS_CO_SOCKET = 97,
IS_STREAM_SOCKET = 98,
IS_SERIALIZED_OBJECT = 99,
};

struct ThreadResource {
uint32_t ref_count;
Expand All @@ -66,6 +72,10 @@ struct ArrayItem {
zend_string *str;
zend_long lval;
double dval;
struct {
int fd;
swSocketType type;
} socket;
zend_string *serialized_object;
} value;

Expand Down
3 changes: 0 additions & 3 deletions ext-src/stubs/php_swoole_socket_coro.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,5 @@ public function getpeername(): false|array {}
public function isClosed(): bool {}
/** @param resource $stream */
public static function import($stream) : Socket | false {}
#ifdef SW_THREAD
public function __wakeup(): void {}
#endif
}
}
1 change: 0 additions & 1 deletion ext-src/stubs/php_swoole_thread.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ public function join(): bool {}
public function joinable(): bool {}
public function detach(): bool {}

public static function exec(string $script_file, mixed ...$args): Thread {}
public static function getArguments(): array {}
public static function getId(): int {}
public static function getTsrmInfo(): array {}
Expand Down
2 changes: 1 addition & 1 deletion ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2651,7 +2651,7 @@ static PHP_METHOD(swoole_server, start) {

if (!ZVAL_IS_NULL(&server_object->init_arguments)) {
call_user_function(NULL, NULL, &server_object->init_arguments, &thread_argv, 0, NULL);
thread_argv_serialized = php_swoole_thread_serialize(&thread_argv);
thread_argv_serialized = php_swoole_thread_argv_serialize(&thread_argv);
}

serv->worker_thread_start = [bootstrap, thread_argv_serialized](const WorkerFn &fn) {
Expand Down
48 changes: 10 additions & 38 deletions ext-src/swoole_socket_coro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ static PHP_METHOD(swoole_socket_coro, getsockname);
static PHP_METHOD(swoole_socket_coro, getpeername);
static PHP_METHOD(swoole_socket_coro, isClosed);
static PHP_METHOD(swoole_socket_coro, import);
#ifdef SW_THREAD
static PHP_METHOD(swoole_socket_coro, __wakeup);
#endif
SW_EXTERN_C_END

// clang-format off
Expand Down Expand Up @@ -132,9 +129,6 @@ static const zend_function_entry swoole_socket_coro_methods[] =
PHP_ME(swoole_socket_coro, getsockname, arginfo_class_Swoole_Coroutine_Socket_getsockname, ZEND_ACC_PUBLIC)
PHP_ME(swoole_socket_coro, isClosed, arginfo_class_Swoole_Coroutine_Socket_isClosed, ZEND_ACC_PUBLIC)
PHP_ME(swoole_socket_coro, import, arginfo_class_Swoole_Coroutine_Socket_import, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
#ifdef SW_THREAD
PHP_ME(swoole_socket_coro, __wakeup, arginfo_class_Swoole_Coroutine_Socket___wakeup, ZEND_ACC_PUBLIC)
#endif
PHP_FE_END
};
// clang-format on
Expand Down Expand Up @@ -720,9 +714,7 @@ static void socket_coro_register_constants(int module_number) {

void php_swoole_socket_coro_minit(int module_number) {
SW_INIT_CLASS_ENTRY(swoole_socket_coro, "Swoole\\Coroutine\\Socket", "Co\\Socket", swoole_socket_coro_methods);
#ifndef SW_THREAD
SW_SET_CLASS_NOT_SERIALIZABLE(swoole_socket_coro);
#endif
SW_SET_CLASS_CLONEABLE(swoole_socket_coro, sw_zend_class_clone_deny);
SW_SET_CLASS_UNSET_PROPERTY_HANDLER(swoole_socket_coro, sw_zend_class_unset_property_deny);
SW_SET_CLASS_CUSTOM_OBJECT(
Expand Down Expand Up @@ -829,12 +821,12 @@ SW_API void php_swoole_socket_set_error_properties(zval *zobject, Socket *socket
php_swoole_socket_set_error_properties(zobject, socket->errCode, socket->errMsg);
}

SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType type) {
static zend_object *create_socket_object(Socket *socket) {
zval zobject;
zend_object *object = socket_coro_create_object(swoole_socket_coro_ce);
SocketObject *sock = (SocketObject *) socket_coro_fetch_object(object);

sock->socket = new Socket(fd, type);
sock->socket = socket;
if (UNEXPECTED(sock->socket->get_fd() < 0)) {
php_swoole_sys_error(E_WARNING, "new Socket() failed");
delete sock->socket;
Expand All @@ -848,6 +840,14 @@ SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType t
return object;
}

SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType type) {
return create_socket_object(new Socket(fd, type));
}

SW_API zend_object *php_swoole_create_socket_from_fd(int fd, int _domain, int _type, int _protocol) {
return create_socket_object(new Socket(fd, _domain, _type, _protocol));
}

SW_API Socket *php_swoole_get_socket(zval *zobject) {
SW_ASSERT(Z_OBJCE_P(zobject) == swoole_socket_coro_ce);
SocketObject *sock = (SocketObject *) socket_coro_fetch_object(Z_OBJ_P(zobject));
Expand Down Expand Up @@ -2217,31 +2217,3 @@ static PHP_METHOD(swoole_socket_coro, import) {

RETURN_OBJ(object);
}

#ifdef SW_THREAD
static PHP_METHOD(swoole_socket_coro, __wakeup) {
zend_long sockfd = zend::object_get_long(ZEND_THIS, ZEND_STRL("fd"));
if (sockfd < 0) {
zend_throw_exception(swoole_exception_ce, EMSG_NO_RESOURCE, ECODE_NO_RESOURCE);
return;
}

zend_long new_sockfd = dup(sockfd);
if (sockfd < 0) {
zend_throw_exception(swoole_exception_ce, EMSG_NO_RESOURCE, ECODE_NO_RESOURCE);
return;
}

SocketObject *sock = (SocketObject *) socket_coro_fetch_object(Z_OBJ_P(ZEND_THIS));

zend_long domain = zend::object_get_long(ZEND_THIS, ZEND_STRL("domain"));
zend_long type = zend::object_get_long(ZEND_THIS, ZEND_STRL("type"));
zend_long protocol = zend::object_get_long(ZEND_THIS, ZEND_STRL("protocol"));

php_swoole_check_reactor();
sock->socket = new Socket((int) new_sockfd, (int) domain, (int) type, (int) protocol);
sock->socket->set_zero_copy(true);
sock->socket->set_buffer_allocator(sw_zend_string_allocator());
zend_update_property_long(swoole_socket_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("fd"), sock->socket->get_fd());
}
#endif
Loading

0 comments on commit 112f552

Please sign in to comment.