Skip to content

Commit

Permalink
Fixes krakjoe#733: accept() recognizes socket blocking state
Browse files Browse the repository at this point in the history
  • Loading branch information
sirsnyder committed Aug 25, 2017
1 parent 03a9b22 commit e863b88
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 5 deletions.
155 changes: 155 additions & 0 deletions examples/NonBlockingSocketServer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<?php

class SocketServer extends Thread
{
public $socket;

public $maxThreads;

public $isRunning;

public $threads;

public $host;

public $port;

public $backlog;

public function __construct(int $maxThreads, string $host, int $port, int $backlog)
{
$this->maxThreads = $maxThreads;
$this->isRunning = false;
$this->threads = [];
$this->host = $host;
$this->port = $port;
$this->backlog = $backlog;
}

public function run()
{
$this->socket = new Socket(AF_INET, SOCK_STREAM, SOL_TCP);

$this->socket->setOption(SOL_SOCKET, SO_REUSEADDR, 1);

$this->socket->bind($this->host, $this->port);

$this->socket->listen($this->backlog);

$this->socket->setBlocking(false);

$this->isRunning = true;

for ($i = 0; $i < $this->maxThreads; $i++) {
$this->threads[$i] = new RequestHandler($this->socket);
$this->threads[$i]->start();
}

$this->synchronized(function () {
do {
foreach ($this->threads as $key => $thread) {
if ($thread->isRunning()) {
continue;
}
$thread->join();
$this->threads[$key] = new RequestHandler($this->socket);
$this->threads[$key]->start();
}
$this->wait(500000); // 500 ms
} while ($this->isRunning);
});
}

public function shutdown()
{
$this->isRunning = false;
foreach ($this->threads as $thread) {
$thread->shutdown();
}
$this->socket->close();
}
}

class RequestHandler extends Thread
{
public $mainSocket;

public $threadSocket;

public $isRunning;

public function __construct(Socket $socket)
{
$this->mainSocket = $socket;
$this->isRunning = false;
}

public function run()
{
$this->isRunning = true;

while($this->isRunning) {

$this->threadSocket = $this->mainSocket->accept();

if($this->threadSocket === false)
{
$this->synchronized(function ()
{
$this->wait(5000); // 5ms
});
continue;
}

$threadId = $this->getThreadId();

$response = "Welcome\nYou are connected with Thread-ID $threadId\nEnter \"quit\" to quit\n\n";

$this->threadSocket->write($response, strlen($response));

do {
$buffer = trim($this->threadSocket->read(1024));

if ('quit' === $buffer) {
break;
}

$talkBack = "You entered $buffer.\n";

$this->threadSocket->write($talkBack, strlen($talkBack));
} while ($this->isRunning);

$this->threadSocket->write("Goodbye\n", 9);
$this->threadSocket->close();

$this->isRunning = false;
}
}

public function shutdown()
{
$this->isRunning = false;
}
}

echo 'Multi-Threaded Socket Server started with PID ' . posix_getpid() . "\n";

$server = new SocketServer(3, '127.0.0.1', 9004, 2);
$server->start();

$running = true;

$shutdown = function () use (&$running, $server) {
echo "Shutting down... \n";
$running = false;
$server->shutdown();
$server->join();
echo "Finished\n";
};

pcntl_signal(SIGINT, $shutdown);

while ($running) {
pcntl_signal_dispatch();
usleep(10000);
};
17 changes: 12 additions & 5 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ void pthreads_socket_accept(zval *object, zend_class_entry *ce, zval *return_val

php_sockaddr_storage sa;
socklen_t sa_len = sizeof(sa);
zend_bool blocking = threaded->store.sock->blocking;

PTHREADS_SOCKET_CHECK(threaded->store.sock);

Expand All @@ -300,15 +301,21 @@ void pthreads_socket_accept(zval *object, zend_class_entry *ce, zval *return_val
ZSTR_VAL(ce->name));
return;
}
php_socket_t acceptedFd = accept(threaded->store.sock->fd, (struct sockaddr*) &sa, &sa_len);

if(acceptedFd < 0) {
if(blocking) {
zend_throw_exception_ex(spl_ce_RuntimeException, 0,
"socket found in invalid state");
}
ZVAL_FALSE(return_value);
return;
}
object_init_ex(return_value, ce);

accepted = PTHREADS_FETCH_FROM(Z_OBJ_P(return_value));
accepted->store.sock->fd =
accept(threaded->store.sock->fd, (struct sockaddr*) &sa, &sa_len);

PTHREADS_SOCKET_CHECK(accepted->store.sock);

accepted->store.sock->fd = acceptedFd;
accepted->store.sock->blocking = 1;
accepted->store.sock->domain = ((struct sockaddr*) &sa)->sa_family;
}

Expand Down

0 comments on commit e863b88

Please sign in to comment.