Skip to content
This repository has been archived by the owner on Dec 16, 2019. It is now read-only.

Commit

Permalink
Fixes #733: accept() recognizes socket blocking state
Browse files Browse the repository at this point in the history
  • Loading branch information
sirsnyder committed Dec 5, 2017
1 parent 03a9b22 commit 287bd05
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 5 deletions.
129 changes: 129 additions & 0 deletions examples/NonBlockingSocketServer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?php
class SocketServer extends Thread {
public $socket;
public $maxThreads;
public $isRunning;
public $threads;
public $host;
public $port;
public $backlog;
public function __construct(int $maxThreads, string $host, int $port, int $backlog) {
$this->maxThreads = $maxThreads;
$this->isRunning = false;
$this->threads = [ ];
$this->host = $host;
$this->port = $port;
$this->backlog = $backlog;
}
public function run() {
$this->socket = new Socket ( AF_INET, SOCK_STREAM, SOL_TCP );

$this->socket->setOption ( SOL_SOCKET, SO_REUSEADDR, 1 );

$this->socket->bind ( $this->host, $this->port );

$this->socket->listen ( $this->backlog );

$this->socket->setBlocking ( false );

$this->isRunning = true;

for($i = 0; $i < $this->maxThreads; $i ++) {
$this->threads [$i] = new RequestHandler ( $this->socket );
$this->threads [$i]->start ();
}

$this->synchronized ( function () {
do {
foreach ( $this->threads as $key => $thread ) {
if ($thread->isRunning ()) {
continue;
}
$thread->join ();
$this->threads [$key] = new RequestHandler ( $this->socket );
$this->threads [$key]->start ();
}
$this->wait ( 500000 ); // 500 ms
} while ( $this->isRunning );
} );
}
public function shutdown() {
$this->isRunning = false;
foreach ( $this->threads as $thread ) {
$thread->shutdown ();
}
$this->socket->close ();
}
}
class RequestHandler extends Thread {
public $mainSocket;
public $threadSocket;
public $isRunning;
public function __construct(Socket $socket) {
$this->mainSocket = $socket;
$this->isRunning = false;
}
public function run() {
$this->isRunning = true;

while ( $this->isRunning ) {

$this->threadSocket = $this->mainSocket->accept ();

if ($this->threadSocket === false) {
$this->synchronized ( function () {
$this->wait ( 5000 ); // 5ms
} );
continue;
}

$threadId = $this->getThreadId ();

$response = "Welcome\nYou are connected with Thread-ID $threadId\nEnter \"quit\" to quit\n\n";

$this->threadSocket->write ( $response, strlen ( $response ) );

do {
$buffer = trim ( $this->threadSocket->read ( 1024 ) );

if ('quit' === $buffer) {
break;
}

$talkBack = "You entered $buffer.\n";

$this->threadSocket->write ( $talkBack, strlen ( $talkBack ) );
} while ( $this->isRunning );

$this->threadSocket->write ( "Goodbye\n", 9 );
$this->threadSocket->close ();

$this->isRunning = false;
}
}
public function shutdown() {
$this->isRunning = false;
}
}

echo 'Multi-Threaded Socket Server started with PID ' . posix_getpid () . "\n";

$server = new SocketServer ( 3, '127.0.0.1', 9004, 2 );
$server->start ();

$running = true;

$shutdown = function () use (&$running, $server) {
echo "Shutting down... \n";
$running = false;
$server->shutdown ();
$server->join ();
echo "Finished\n";
};

pcntl_signal ( SIGINT, $shutdown );

while ( $running ) {
pcntl_signal_dispatch ();
usleep ( 10000 );
}
17 changes: 12 additions & 5 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ void pthreads_socket_accept(zval *object, zend_class_entry *ce, zval *return_val

php_sockaddr_storage sa;
socklen_t sa_len = sizeof(sa);
zend_bool blocking = threaded->store.sock->blocking;

PTHREADS_SOCKET_CHECK(threaded->store.sock);

Expand All @@ -300,15 +301,21 @@ void pthreads_socket_accept(zval *object, zend_class_entry *ce, zval *return_val
ZSTR_VAL(ce->name));
return;
}
php_socket_t acceptedFd = accept(threaded->store.sock->fd, (struct sockaddr*) &sa, &sa_len);

if(acceptedFd < 0) {
if(blocking) {
zend_throw_exception_ex(spl_ce_RuntimeException, 0,
"socket found in invalid state");
}
ZVAL_FALSE(return_value);
return;
}
object_init_ex(return_value, ce);

accepted = PTHREADS_FETCH_FROM(Z_OBJ_P(return_value));
accepted->store.sock->fd =
accept(threaded->store.sock->fd, (struct sockaddr*) &sa, &sa_len);

PTHREADS_SOCKET_CHECK(accepted->store.sock);

accepted->store.sock->fd = acceptedFd;
accepted->store.sock->blocking = 1;
accepted->store.sock->domain = ((struct sockaddr*) &sa)->sa_family;
}

Expand Down

0 comments on commit 287bd05

Please sign in to comment.