Так что опыт использования вебсокет в продакшене накопился приличный. И вот недавно случилось событие, которое сподвигло меня написать первую статью на Хабре.
После того, как игрушка была опубликована в социальной сети, я поправил все найденные критические/блокирующие баги и начал приводить всё в порядок в спокойном режиме. Я хочу обратить внимание на то, что этот вот пример — это вообще единственный в интернете гайд, который содержит серверный код, который можно вставить себе в код и использовать его. Ну вот набрать в поисковике «php websocket server» — попробуйте что-то найти, что можно себе поставить.
Внезапно я перечитываю указанную выше статью и в самом начале обнаруживаю ссылки на «phpdaemon» и «ratchet». Ну думаю, давай в спокойном режиме посмотрю на код тамошний. В PhpDeamon в недрах обработки WebSocket соединения небольшое, но безумно важное ветвление на протоколы WebSocket. И там прямо написано для одного case «Safari5 and many non-browser clients». Сказать, что я офигел — это ничего не сказать. Перед глазами пронеслись несколько сотен часов, тонны нервотрёпки и страдания, которые поставили под вопрос вообще проект. Я не поверил, решил проверить.
В течении ~15 часов я вытянул из PhpDeamon минимальный код, связанный с WebSocket (который работает во всех браузерах последней версии, а сам серверный код может работать под высокой нагрузкой) и его постараюсь опубликовать с объяснениями. Чтобы другие люди не испытали те мучения, через которые мне пришлось пройти. Да, кусок кода получился не маленький, но извините: WebSocket он на клиентской части очень простой, а на стороне сервера всё довольно объёмно (скажем отдельное «спасибо» разработчикам Сафари). Также в связи с тем, что область применения WebSocket — это в первую очередь игры, важен вопрос неблокирующего использования серверного сокета — это бонусная сложность, которая никак здесь не рассматривается, хотя и очень важна.
Тестовое приложение я хотел написать без объектов, чтобы было понятнее. Но, к сожалению, такой подход в данном примере расплодит много повторяющегося кода, поэтому пришлось добавить 1 класс и 3 его наследника. Остальное всё без объектов.
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>WebSocket test page</title>
</head>
<body onload="create();">
<script type="text/javascript">
function create() {
// Example
ws = new WebSocket('ws://'+document.domain+':8081/');
ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';}
ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';}
ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';}
}
</script>
<button onclick="create();">Create WebSocket</button>
<button onclick="ws.send('ping');">Send ping</button>
<button onclick="ws.close();">Close WebSocket</button>
<div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div>
</body>
</html>
В моей игре мне пришлось использовать 3 сокет сервера. Для websocket, для worker`ов и для longpooling. В игре очень много математики, поэтому надо было делать вёркеры и выдавать им задачи на вычисления. Так вот к чему это. Что stream_select для них всех должен быть общий, иначе будут лаги или безумное использование процессора. Это знание тоже было получено взамен кучи истраченных нервов.
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
stream_set_blocking($master, false); // Относительно этой команды я не уверен, потому что мастер из сокетов читает только новые соединения, и для чтения используется "stream_socket_accept". Вариант, что весь сервис будет подвешен на несколько секунд из-за того, что клиент не торопится соединятся - категорически неприемлемо.
while (true) {
$read = $sockets;
$write = $except = array();
if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
var_dump('stream_select error');
break;
// Сделать выход из цикла, а не "die", потому что в продакшине скорей всего этот код будет выполняться как сервис и при команде "/etc/init.d/game restart" тут 100% будет этот case, так вот надо дать "pcntl" код нормально отработать и не мешать ему.
}
foreach ($read as $socket) {
$index_socket = array_search($socket, $sockets);
if ($index_socket == 0) {
// Новое соединение
continue;
}
// Тут будет обработка сообщений клиентов
}
}
Соединение с новыми клиентами вполне себе стандартный код, но вот из-за того, что сокеты у нас в неблокирующем режиме, нужно написать кучу кода, который по кусочкам соберёт все входящие данные и, когда данных будет достаточно, обработает их, поймёт какой протокол надо использовать и переключится на использование этого протокола. Одна эта задача — уже гора кода, и в PhpDeamon нагородили много кода, который к WebSocket отношения не имеет (они же там 8 разных серверов умеют подымать). Удалось многое отрезать и упростить в этой теме. Оставил только то, что относится к WebSocket.
class ws {
const MAX_BUFFER_SIZE = 1024 * 1024;
protected $socket;
/**
* @var array _SERVER
*/
public $server = [];
protected $headers = [];
protected $closed = false;
protected $unparsed_data = '';
private $current_header;
private $unread_lines = array();
protected $extensions = [];
protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';
/**
* @var integer Current state
*/
protected $state = 0; // stream state of the connection (application protocol level)
/**
* Alias of STATE_STANDBY
*/
const STATE_ROOT = 0;
/**
* Standby state (default state)
*/
const STATE_STANDBY = 0;
/**
* State: first line
*/
const STATE_FIRSTLINE = 1;
/**
* State: headers
*/
const STATE_HEADERS = 2;
/**
* State: content
*/
const STATE_CONTENT = 3;
/**
* State: prehandshake
*/
const STATE_PREHANDSHAKE = 5;
/**
* State: handshaked
*/
const STATE_HANDSHAKED = 6;
public function get_state() {
return $this->state;
}
public function closed() {
return $this->closed;
}
protected function close() {
if ($this->closed) return;
var_dump('self close');
fclose($this->socket);
$this->closed = true;
}
public function __construct($socket) {
stream_set_blocking($socket, false);
$this->socket = $socket;
}
private function read_line() {
$lines = explode(PHP_EOL, $this->unparsed_data);
$last_line = $lines[count($lines)-1];
unset($lines[count($lines) - 1]);
foreach ($lines as $line) {
$this->unread_lines[] = $line;
}
$this->unparsed_data = $last_line;
if (count($this->unread_lines) != 0) {
return array_shift($this->unread_lines);
} else {
return null;
}
}
public function on_receive_data() {
if ($this->closed) return;
$data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE);
if (is_string($data)) {
$this->unparsed_data .= $data;
}
}
/**
* Called when new data received.
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_STANDBY) {
$this->state = self::STATE_FIRSTLINE;
}
if ($this->state === self::STATE_FIRSTLINE) {
if (!$this->http_read_first_line()) {
return;
}
$this->state = self::STATE_HEADERS;
}
if ($this->state === self::STATE_HEADERS) {
if (!$this->http_read_headers()) {
return;
}
if (!$this->http_process_headers()) {
$this->close();
return;
}
$this->state = self::STATE_CONTENT;
}
if ($this->state === self::STATE_CONTENT) {
$this->state = self::STATE_PREHANDSHAKE;
}
}
/**
* Read first line of HTTP request
* @return boolean|null Success
*/
protected function http_read_first_line() {
if (($l = $this->read_line()) === null) {
return null;
}
$e = explode(' ', $l);
$u = isset($e[1]) ? parse_url($e[1]) : false;
if ($u === false) {
$this->bad_request();
return false;
}
if (!isset($u['path'])) {
$u['path'] = null;
}
if (isset($u['host'])) {
$this->server['HTTP_HOST'] = $u['host'];
}
$srv = & $this->server;
$srv['REQUEST_METHOD'] = $e[0];
$srv['REQUEST_TIME'] = time();
$srv['REQUEST_TIME_FLOAT'] = microtime(true);
$srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
$srv['DOCUMENT_URI'] = $u['path'];
$srv['PHP_SELF'] = $u['path'];
$srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
$srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1';
$srv['REMOTE_ADDR'] = null;
$srv['REMOTE_PORT'] = null;
return true;
}
/**
* Read headers line-by-line
* @return boolean|null Success
*/
protected function http_read_headers() {
while (($l = $this->read_line()) !== null) {
if ($l === '') {
return true;
}
$e = explode(': ', $l);
if (isset($e[1])) {
$this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
$this->server[$this->current_header] = $e[1];
} elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
// multiline header continued
$this->server[$this->current_header] .= $e[0];
} else {
// whatever client speaks is not HTTP anymore
$this->bad_request();
return false;
}
}
}
/**
* Process headers
* @return bool
*/
protected function http_process_headers() {
$this->state = self::STATE_PREHANDSHAKE;
if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
$str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
$str = preg_replace($this->extensionsCleanRegex, '', $str);
$this->extensions = explode(', ', $str);
}
if (!isset($this->server['HTTP_CONNECTION'])
|| (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
|| !isset($this->server['HTTP_UPGRADE'])
|| (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
) {
$this->close();
return false;
}
if (isset($this->server['HTTP_COOKIE'])) {
self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
}
if (isset($this->server['QUERY_STRING'])) {
self::parse_str($this->server['QUERY_STRING'], $this->get);
}
// ----------------------------------------------------------
// Protocol discovery, based on HTTP headers...
// ----------------------------------------------------------
if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
$this->switch_to_protocol('v13');
} elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
$this->switch_to_protocol('v13');
} else {
error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
$this->close();
return false;
}
} elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
$this->switch_to_protocol('ve');
} else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
$this->switch_to_protocol('v0');
}
// ----------------------------------------------------------
// End of protocol discovery
// ----------------------------------------------------------
return true;
}
private function switch_to_protocol($protocol) {
$class = 'ws_'.$protocol;
$this->new_instance = new $class($this->socket);
$this->new_instance->state = $this->state;
$this->new_instance->unparsed_data = $this->unparsed_data;
$this->new_instance->server = $this->server;
}
/**
* Send Bad request
* @return void
*/
public function bad_request() {
$this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
$this->close();
}
/**
* Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
* @param string $s String to parse
* @param array &$var Reference to the resulting array
* @param boolean $header Header-style string
* @return void
*/
public static function parse_str($s, &$var, $header = false)
{
static $cb;
if ($cb === null) {
$cb = function ($m) {
return urlencode(html_entity_decode('' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
};
}
if ($header) {
$s = strtr($s, self::$hvaltr);
}
if (
(stripos($s, '%u') !== false)
&& preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m)
) {
$s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s);
}
parse_str($s, $var);
}
/**
* Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
* @param string $data Data to send
* @return boolean Success
*/
public function write($data) {
if ($this->closed) return false;
return stream_socket_sendto($this->socket, $data) == 0;
}
}
Смысл этого класса в таком урезанном виде — в конструкторе установить неблокирующий режим для соединения с клиентом. Далее в основном цикле, каждый раз, когда приходят данные — сразу их прочитать и положить (дополнить) в «unparsed_data» переменную (это метод on_receive_data). Важно понимать, что если мы выйдем за размеры MAX_BUFFER_SIZE, то вообще ничего страшного не случится. Можно в итоговом примере, что тут будет, поставить его значение, скажем, «5» и убедится, что всё по-прежнему работает. Просто данные из буфера на первом шаге будут проигнорированы — они ведь неполные будут, и со второго или пятого или сотого захода наберутся, наконец, все принятые данные и будут обработаны. При этом stream_select в основном цикле ждать не будет даже микросекунды, пока все данные не будут извлечены. Константу надо подобрать такую, чтобы 95% ожидаемых данных читались целиком.
Далее в основном цикле (после получения очередной порции данных) мы пробуем накопленные данные обработать (это метод on_read). В классе «ws» метод «on_read» состоит по сути из трёх шагов: «читаем первую строку и готовим переменные окружения», «читаем все заголовки», «обрабатываем все заголовки». Первые 2 пояснять не надо, но написаны они довольно объёмно потому, что мы в неблокирующем режиме и надо быть готовым к тому, что данные оборваны в любом месте. Обработка заголовков сначала проверяет формат запроса правильный или нет, а потом по заголовкам определяет протокол, по которому будет общаться с клиентом. В итоге должны дёрнуть метод switch_to_protocol. Этот метод внутри себя сформирует экземпляр класса «ws_<протокол>» и подготовит его для отдачи в основной цикл.
В основном цикле далее надо собственно проверить: а не надо ли подменить объект (если кто-то может предложить реализацию этого места лучше — всегда пожалуйста).
Далее в основном цикле надо поставить проверку: а не закрыт ли сокет. Если закрыт, то очистить память (об этом дальнее в следующем блоке).
require('ws.php');
require('ws_v0.php');
require('ws_v13.php');
require('ws_ve.php');
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
/**
* @var ws[] $connections
*/
$connections = array();
stream_set_blocking($master, false);
/**
* @param ws $connection
* @param $data
* @param $type
*/
$my_callback = function($connection, $data, $type) {
var_dump('my ws data: ['.$data.'/'.$type.']');
$connection->send_frame('test '.time());
};
while (true) {
$read = $sockets;
$write = $except = array();
if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
var_dump('stream_select error');
break;
}
foreach ($read as $socket) {
$index_socket = array_search($socket, $sockets);
if ($index_socket == 0) {
// Новое соединение
if ($socket_new = stream_socket_accept($master, -1)) {
$connection = new ws($socket_new, $my_callback);
$sockets[] = $socket_new;
$index_new_socket = array_search($socket_new, $sockets);
$connections[$index_new_socket] = &$connection;
$index_socket = $index_new_socket;
} else {
// Я так и не понял что в этом случае надо делать
error_log('stream_socket_accept');
var_dump('error stream_socket_accept');
continue;
}
}
$connection = &$connections[$index_socket];
$connection->on_receive_data();
$connection->on_read();
if ($connection->get_state() == ws::STATE_PREHANDSHAKE) {
$connection = $connection->get_new_instance();
$connections[$index_socket] = &$connection;
$connection->on_read();
}
if ($connection->closed()) {
unset($sockets[$index_socket]);
unset($connections[$index_socket]);
unset($connection);
var_dump('close '.$index_socket);
}
}
}
Тут добавлен "$my_callback" — это наш custom обработчик сообщений от клиента. Разумеется в продакшине можно завернуть это всё в объекты всякие, а тут чтобы было понятнее просто переменная-функция. О ней чуть позже подробнее.
Реализована обработка нового соединения и реализовано основное тело цикла, о котором я чуть выше писал.
Я хочу обратить внимание на код сервера тут. Что если прочтённые данные из сокета — это пустая строка (да, разумеется я видел там в update проверку на пустую строку), то сокет надо закрыть. Ох, я даже не знаю, сколько этот момет попил мне кровушки и скольких пользователей я потерял. Внезапнейшим образом Сафари отправляет пустую строку и считает это нормой, а этот код берёт и закрывает соединение пользователю. Яндекс-браузер иногда ведёт себя так же. Уж не знаю почему, но в этом случае для Сафари WebSocket остаётся зависшим, то есть он не закрывается, не открывается — просто висит и всё. Вы уже заметили, что я неравнодушен к этому волшебному браузеру? Мне вспоминается, как я верстал под IE6 — примерно такие же ощущения.
Теперь о том, зачем я использую array_search и синхронизирую массив $sockets и массив $connections. Дело в том, что stream_select жизненно необходим чистый массив $sockets и никак иначе. Но как-то надо же связать конкретный сокет из массива $sockets с объектом «ws». Перепробовал кучу вариантов — в итоге остановился на таком варианте, что есть 2 массива, которые постоянно синхронизированы по ключам. В одном массиве неоходимые чистые сокеты для stream_select, а во втором экземпляры класса «ws» или его наследники. Если кто-то может предложить это место лучше — предлагайте.
Ещё отдельно надо отметить случай, когда stream_socket_accept зафэйлился. Я так понимаю, теоретически это может быть только в том случае, если мастер сокет у нас в неблокирующем режиме, и приехало недостаточно данных для соединения клиента. Поэтому просто ничего не делаем.
class ws {
private static $hvaltr = ['; ' => '&', ';' => '&', ' ' => '%20'];
const maxAllowedPacket = 1024 * 1024 * 1024;
const MAX_BUFFER_SIZE = 1024 * 1024;
protected $socket;
/**
* @var array _SERVER
*/
public $server = [];
protected $on_frame_user = null;
protected $handshaked = false;
protected $headers = [];
protected $headers_sent = false;
protected $closed = false;
protected $unparsed_data = '';
private $current_header;
private $unread_lines = array();
/**
* @var ws|null
*/
private $new_instance = null;
protected $extensions = [];
protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';
/**
* @var integer Current state
*/
protected $state = 0; // stream state of the connection (application protocol level)
/**
* Alias of STATE_STANDBY
*/
const STATE_ROOT = 0;
/**
* Standby state (default state)
*/
const STATE_STANDBY = 0;
/**
* State: first line
*/
const STATE_FIRSTLINE = 1;
/**
* State: headers
*/
const STATE_HEADERS = 2;
/**
* State: content
*/
const STATE_CONTENT = 3;
/**
* State: prehandshake
*/
const STATE_PREHANDSHAKE = 5;
/**
* State: handshaked
*/
const STATE_HANDSHAKED = 6;
public function get_state() {
return $this->state;
}
public function get_new_instance() {
return $this->new_instance;
}
public function closed() {
return $this->closed;
}
protected function close() {
if ($this->closed) return;
var_dump('self close');
fclose($this->socket);
$this->closed = true;
}
public function __construct($socket, $on_frame_user = null) {
stream_set_blocking($socket, false);
$this->socket = $socket;
$this->on_frame_user = $on_frame_user;
}
private function read_line() {
$lines = explode(PHP_EOL, $this->unparsed_data);
$last_line = $lines[count($lines)-1];
unset($lines[count($lines) - 1]);
foreach ($lines as $line) {
$this->unread_lines[] = $line;
}
$this->unparsed_data = $last_line;
if (count($this->unread_lines) != 0) {
return array_shift($this->unread_lines);
} else {
return null;
}
}
public function on_receive_data() {
if ($this->closed) return;
$data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE);
if (is_string($data)) {
$this->unparsed_data .= $data;
}
}
/**
* Called when new data received.
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_STANDBY) {
$this->state = self::STATE_FIRSTLINE;
}
if ($this->state === self::STATE_FIRSTLINE) {
if (!$this->http_read_first_line()) {
return;
}
$this->state = self::STATE_HEADERS;
}
if ($this->state === self::STATE_HEADERS) {
if (!$this->http_read_headers()) {
return;
}
if (!$this->http_process_headers()) {
$this->close();
return;
}
$this->state = self::STATE_CONTENT;
}
if ($this->state === self::STATE_CONTENT) {
$this->state = self::STATE_PREHANDSHAKE;
}
}
/**
* Read first line of HTTP request
* @return boolean|null Success
*/
protected function http_read_first_line() {
if (($l = $this->read_line()) === null) {
return null;
}
$e = explode(' ', $l);
$u = isset($e[1]) ? parse_url($e[1]) : false;
if ($u === false) {
$this->bad_request();
return false;
}
if (!isset($u['path'])) {
$u['path'] = null;
}
if (isset($u['host'])) {
$this->server['HTTP_HOST'] = $u['host'];
}
$address = explode(':', stream_socket_get_name($this->socket, true)); //получаем адрес клиента
$srv = & $this->server;
$srv['REQUEST_METHOD'] = $e[0];
$srv['REQUEST_TIME'] = time();
$srv['REQUEST_TIME_FLOAT'] = microtime(true);
$srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
$srv['DOCUMENT_URI'] = $u['path'];
$srv['PHP_SELF'] = $u['path'];
$srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
$srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1';
$srv['REMOTE_ADDR'] = $address[0];
$srv['REMOTE_PORT'] = $address[1];
return true;
}
/**
* Read headers line-by-line
* @return boolean|null Success
*/
protected function http_read_headers() {
while (($l = $this->read_line()) !== null) {
if ($l === '') {
return true;
}
$e = explode(': ', $l);
if (isset($e[1])) {
$this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
$this->server[$this->current_header] = $e[1];
} elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
// multiline header continued
$this->server[$this->current_header] .= $e[0];
} else {
// whatever client speaks is not HTTP anymore
$this->bad_request();
return false;
}
}
}
/**
* Process headers
* @return bool
*/
protected function http_process_headers() {
$this->state = self::STATE_PREHANDSHAKE;
if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
$str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
$str = preg_replace($this->extensionsCleanRegex, '', $str);
$this->extensions = explode(', ', $str);
}
if (!isset($this->server['HTTP_CONNECTION'])
|| (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
|| !isset($this->server['HTTP_UPGRADE'])
|| (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
) {
$this->close();
return false;
}
/*
if (isset($this->server['HTTP_COOKIE'])) {
self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
}
if (isset($this->server['QUERY_STRING'])) {
self::parse_str($this->server['QUERY_STRING'], $this->get);
}
*/
// ----------------------------------------------------------
// Protocol discovery, based on HTTP headers...
// ----------------------------------------------------------
if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
$this->switch_to_protocol('v13');
} elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
$this->switch_to_protocol('v13');
} else {
error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
$this->close();
return false;
}
} elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
$this->switch_to_protocol('ve');
} else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
$this->switch_to_protocol('v0');
}
// ----------------------------------------------------------
// End of protocol discovery
// ----------------------------------------------------------
return true;
}
private function switch_to_protocol($protocol) {
$class = 'ws_'.$protocol;
$this->new_instance = new $class($this->socket);
$this->new_instance->state = $this->state;
$this->new_instance->unparsed_data = $this->unparsed_data;
$this->new_instance->server = $this->server;
$this->new_instance->on_frame_user = $this->on_frame_user;
}
/**
* Send Bad request
* @return void
*/
public function bad_request() {
$this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
$this->close();
}
/**
* Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
* @param string $s String to parse
* @param array &$var Reference to the resulting array
* @param boolean $header Header-style string
* @return void
*/
public static function parse_str($s, &$var, $header = false) {
static $cb;
if ($cb === null) {
$cb = function ($m) {
return urlencode(html_entity_decode('' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
};
}
if ($header) {
$s = strtr($s, self::$hvaltr);
}
if (
(stripos($s, '%u') !== false)
&& preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m)
) {
$s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s);
}
parse_str($s, $var);
}
/**
* Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
* @param string $data Data to send
* @return boolean Success
*/
public function write($data) {
if ($this->closed) return false;
return stream_socket_sendto($this->socket, $data) == 0;
}
/**
* Будте любезны в отнаследованном классе реализовать этот метод
* @return bool
*/
protected function send_handshake_reply() {
return false;
}
/**
* Called when we're going to handshake.
* @return boolean Handshake status
*/
public function handshake() {
$extra_headers = '';
foreach ($this->headers as $k => $line) {
if ($k !== 'STATUS') {
$extra_headers .= $line . "\r\n";
}
}
if (!$this->send_handshake_reply($extra_headers)) {
error_log(get_class($this) . '::' . __METHOD__ . ' : Handshake protocol failure for client ""'); // $this->addr
$this->close();
return false;
}
$this->handshaked = true;
$this->headers_sent = true;
$this->state = static::STATE_HANDSHAKED;
return true;
}
/**
* Read from buffer without draining
* @param integer $n Number of bytes to read
* @param integer $o Offset
* @return string|false
*/
public function look($n, $o = 0) {
if (strlen($this->unparsed_data) <= $o) {
return '';
}
return substr($this->unparsed_data, $o, $n);
}
/**
* Convert bytes into integer
* @param string $str Bytes
* @param boolean $l Little endian? Default is false
* @return integer
*/
public static function bytes2int($str, $l = false) {
if ($l) {
$str = strrev($str);
}
$dec = 0;
$len = strlen($str);
for ($i = 0; $i < $len; ++$i) {
$dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1);
}
return $dec;
}
/**
* Drains buffer
* @param integer $n Numbers of bytes to drain
* @return boolean Success
*/
public function drain($n) {
$ret = substr($this->unparsed_data, 0, $n);
$this->unparsed_data = substr($this->unparsed_data, $n);
return $ret;
}
/**
* Read data from the connection's buffer
* @param integer $n Max. number of bytes to read
* @return string|false Readed data
*/
public function read($n) {
if ($n <= 0) {
return '';
}
$read = $this->drain($n);
if ($read === '') {
return false;
}
return $read;
}
/**
* Reads all data from the connection's buffer
* @return string Readed data
*/
public function read_unlimited() {
$ret = $this->unparsed_data;
$this->unparsed_data = '';
return $ret;
}
/**
* Searches first occurence of the string in input buffer
* @param string $what Needle
* @param integer $start Offset start
* @param integer $end Offset end
* @return integer Position
*/
public function search($what, $start = 0, $end = -1) {
return strpos($this->unparsed_data, $what, $start);
}
/**
* Called when new frame received.
* @param string $data Frame's data.
* @param string $type Frame's type ("STRING" OR "BINARY").
* @return boolean Success.
*/
public function on_frame($data, $type) {
if (is_callable($this->on_frame_user)) {
call_user_func($this->on_frame_user, $this, $data, $type);
}
return true;
}
public function send_frame($data, $type = null, $cb = null) {
return false;
}
/**
* Get real frame type identificator
* @param $type
* @return integer
*/
public function get_frame_type($type) {
if (is_int($type)) {
return $type;
}
if ($type === null) {
$type = 'STRING';
}
$frametype = @constant(get_class($this) . '::' . $type);
if ($frametype === null) {
error_log(__METHOD__ . ' : Undefined frametype "' . $type . '"');
}
return $frametype;
}
}
По сути тут добавлены 3 вещи: «соединение с клиентом на уровне веб сокета», «получение сообщения от клиента», «отправка сообщения клиенту».
Для начала немного теории и терминологии. «Handshake» — это с точки зрения веб сокетов процедура установления соединения поверх http. Надо ведь решить кучу вопросов: как пробиться сквозь гущу прокси и кэшэй, как защитится от злых хакеров. И термин «frame» — это кусок данных в расшифрованном виде, это сообщение от клиента или сообщение для клиента. Возможно, об этом стоило написать в начале статьи, но из-за этих вот «frame» делать сокет сервер в блокирующем режиме сокетов имхо бессмысленно. То, как сделан этот момент вот тут — это лишило меня сна не на одну ночь. В той статье не рассматривается вариант, что frame приехал не полностью или их приехало сразу два. И то и то, между прочим, вполне себе типичная ситуация, как показали логи игры.
Теперь к деталям.
Соединение с клиентом на уровне веб сокета — предполагается, что протокол (например, ws_v0) перекроет метод «on_read» и внутри себя дёрнет «handshake», когда данных будет достаточно. Далее кусок «handshake» в родителе. Далее дёргается метод «send_handshake_reply», который должен быть реализован в протоколе. Этот вот «send_handshake_reply» должен такое ответить клиенту, чтобы тот понял, что «соединение установлено», нормальным браузерам — нормальный ответ, а для Сафари — особый ответ.
Получение сообщения от клиента. Обращаю внимание, что глупые клиенты могут реализовать такой вариант, что соединение не установлено, а сообщение от пользователя уже пришло. Поэтому надо бережно относится к «unparsed_data» переменной. В каждом протоколе метод «on_read» должен понять размер передаваемого frame, убедиться, что frame целиком приехал, расшифровать приехавший frame в сообщение пользователя. В каждом протоколе это делается очень по-разному и очень кучеряво (мы ж не знаем, приехал frame полностью или нет, плюс нельзя откусить ни байта следующего frame). Далее внутри «on_read», когда данные клиента получены и расшифрованы и определён их тип (да-да и такое предусмотрено), дёргаем метод «on_frame», который внутри класса «ws», тот, в свою очередь, дёрнет custom callback (функция $my_callback, перед основным циклом которая). И в итоге $my_callback получает сообщение от клиента.
Отправка сообщения клиенту. Просто дёргается метод «send_frame», который должен быть реализован внутри протокола. Тут просто шифруем сообщение и отправляем пользователю. Разные протоколы шифруют по-разному.
Теперь прилагаю 3 протокола «v13», «v0», «ve»:
class ws_v13 extends ws {
const CONTINUATION = 0;
const STRING = 0x1;
const BINARY = 0x2;
const CONNCLOSE = 0x8;
const PING = 0x9;
const PONG = 0xA;
protected static $opcodes = [
0 => 'CONTINUATION',
0x1 => 'STRING',
0x2 => 'BINARY',
0x8 => 'CONNCLOSE',
0x9 => 'PING',
0xA => 'PONG',
];
protected $outgoingCompression = 0;
protected $framebuf = '';
/**
* Apply mask
* @param $data
* @param string|false $mask
* @return mixed
*/
public function mask($data, $mask) {
for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) {
$data[$i] = $data[$i] ^ $mask[$i % $ml];
}
return $data;
}
/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" OR "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}
if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
/*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) {
//$data = gzcompress($data, $this->outgoingCompression);
//$rsv1 = 1;
}*/
$fin = 1;
$rsv1 = 0;
$rsv2 = 0;
$rsv3 = 0;
$this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT))));
$dataLength = strlen($data);
$isMasked = false;
$isMaskedInt = $isMasked ? 128 : 0;
if ($dataLength <= 125) {
$this->write(chr($dataLength + $isMaskedInt));
} elseif ($dataLength <= 65535) {
$this->write(chr(126 + $isMaskedInt) . // 126 + 128
chr($dataLength >> 8) .
chr($dataLength & 0xFF));
} else {
$this->write(chr(127 + $isMaskedInt) . // 127 + 128
chr($dataLength >> 56) .
chr($dataLength >> 48) .
chr($dataLength >> 40) .
chr($dataLength >> 32) .
chr($dataLength >> 24) .
chr($dataLength >> 16) .
chr($dataLength >> 8) .
chr($dataLength & 0xFF));
}
if ($isMasked) {
$mask = chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF));
$this->write($mask . $this->mask($data, $mask));
} else {
$this->write($data);
}
if ($cb !== null) {
$cb();
}
return true;
}
/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = '') {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) {
return false;
}
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') {
return false;
}
if (isset($this->server['HTTP_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN'];
}
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
}
$this->write("HTTP/1.1 101 Switching Protocols\r\n"
. "Upgrade: WebSocket\r\n"
. "Connection: Upgrade\r\n"
. "Date: " . date('r') . "\r\n"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "\r\n"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
. "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "\r\n"
);
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
}
$this->write($extraHeaders."\r\n");
return true;
}
/**
* Called when new data received
* @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_PREHANDSHAKE) {
if (!$this->handshake()) {
return;
}
}
if ($this->state === self::STATE_HANDSHAKED) {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$first = ord($this->look(1)); // first byte integer (fin, opcode)
$firstBits = decbin($first);
$opcode = (int)bindec(substr($firstBits, 4, 4));
if ($opcode === 0x8) { // CLOSE
$this->close();
return;
}
$opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false;
if (!$opcodeName) {
error_log(get_class($this) . ': Undefined opcode ' . $opcode);
$this->close();
return;
}
$second = ord($this->look(1, 1)); // second byte integer (masked, payload length)
$fin = (bool)($first >> 7);
$isMasked = (bool)($second >> 7);
$dataLength = $second & 0x7f;
$p = 2;
if ($dataLength === 0x7e) { // 2 bytes-length
if ($buflen < $p + 2) {
return; // not enough data yet
}
$dataLength = self::bytes2int($this->look(2, $p), false);
$p += 2;
} elseif ($dataLength === 0x7f) { // 8 bytes-length
if ($buflen < $p + 8) {
return; // not enough data yet
}
$dataLength = self::bytes2int($this->look(8, $p));
$p += 8;
}
if (self::maxAllowedPacket <= $dataLength) {
// Too big packet
$this->close();
return;
}
if ($isMasked) {
if ($buflen < $p + 4) {
return; // not enough data yet
}
$mask = $this->look(4, $p);
$p += 4;
}
if ($buflen < $p + $dataLength) {
return; // not enough data yet
}
$this->drain($p);
$data = $this->read($dataLength);
if ($isMasked) {
$data = $this->mask($data, $mask);
}
//Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data))));
/*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame
$data = gzuncompress($data, $this->pool->maxAllowedPacket);
}*/
if (!$fin) {
$this->framebuf .= $data;
} else {
$this->on_frame($this->framebuf . $data, $opcodeName);
$this->framebuf = '';
}
}
}
}
}
class ws_v0 extends ws {
const STRING = 0x00;
const BINARY = 0x80;
protected $key;
/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = '') {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
return false;
}
$final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key);
$this->key = null;
if (!$final_key) {
return false;
}
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
}
$this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
. "Upgrade: WebSocket\r\n"
. "Connection: Upgrade\r\n"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n");
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
}
$this->write($extraHeaders . "\r\n" . $final_key);
return true;
}
/**
* Computes final key for Sec-WebSocket.
* @param string Key1
* @param string Key2
* @param string Data
* @return string Result
*/
protected function _computeFinalKey($key1, $key2, $data) {
if (strlen($data) < 8) {
error_log(get_class($this) . '::' . __METHOD__ . ' : Invalid handshake data for client ""'); // $this->addr
return false;
}
return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true);
}
/**
* Computes key for Sec-WebSocket.
* @param string Key
* @return string Result
*/
protected function _computeKey($key) {
$spaces = 0;
$digits = '';
for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
$c = substr($key, $i, 1);
if ($c === "\x20") {
++$spaces;
} elseif (ctype_digit($c)) {
$digits .= $c;
}
}
if ($spaces > 0) {
$result = (float)floor($digits / $spaces);
} else {
$result = (float)$digits;
}
return pack('N', $result);
}
/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" OR "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}
if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
if ($type === 'CONNCLOSE') {
if ($cb !== null) {
$cb($this);
return true;
}
}
$type = $this->get_frame_type($type);
// Binary
if (($type & self::BINARY) === self::BINARY) {
$n = strlen($data);
$len = '';
$pos = 0;
char:
++$pos;
$c = $n >> 0 & 0x7F;
$n >>= 7;
if ($pos !== 1) {
$c += 0x80;
}
if ($c !== 0x80) {
$len = chr($c) . $len;
goto char;
};
$this->write(chr(self::BINARY) . $len . $data);
}
// String
else {
$this->write(chr(self::STRING) . $data . "\xFF");
}
if ($cb !== null) {
$cb();
}
return true;
}
/**
* Called when new data received
* @return void
*/
public function on_read() {
if ($this->state === self::STATE_PREHANDSHAKE) {
if (strlen($this->unparsed_data) < 8) {
return;
}
$this->key = $this->read_unlimited();
$this->handshake();
}
if ($this->state === self::STATE_HANDSHAKED) {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$hdr = $this->look(10);
$frametype = ord(substr($hdr, 0, 1));
if (($frametype & 0x80) === 0x80) {
$len = 0;
$i = 0;
do {
if ($buflen < $i + 1) {
// not enough data yet
return;
}
$b = ord(substr($hdr, ++$i, 1));
$n = $b & 0x7F;
$len *= 0x80;
$len += $n;
} while ($b > 0x80);
if (self::maxAllowedPacket <= $len) {
// Too big packet
$this->close();
return;
}
if ($buflen < $len + $i + 1) {
// not enough data yet
return;
}
$this->drain($i + 1);
$this->on_frame($this->read($len), 'BINARY');
} else {
if (($p = $this->search("\xFF")) !== false) {
if (self::maxAllowedPacket <= $p - 1) {
// Too big packet
$this->close();
return;
}
$this->drain(1);
$data = $this->read($p);
$this->drain(1);
$this->on_frame($data, 'STRING');
} else {
if (self::maxAllowedPacket < $buflen - 1) {
// Too big packet
$this->close();
return;
}
// not enough data yet
return;
}
}
}
}
}
}
class ws_ve extends ws {
const STRING = 0x00;
const BINARY = 0x80;
/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = '') {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
}
$this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
. "Upgrade: WebSocket\r\n"
. "Connection: Upgrade\r\n"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
);
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
}
$this->write($extraHeaders."\r\n");
return true;
}
/**
* Computes key for Sec-WebSocket.
* @param string Key
* @return string Result
*/
protected function _computeKey($key) {
$spaces = 0;
$digits = '';
for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
$c = substr($key, $i, 1);
if ($c === "\x20") {
++$spaces;
} elseif (ctype_digit($c)) {
$digits .= $c;
}
}
if ($spaces > 0) {
$result = (float)floor($digits / $spaces);
} else {
$result = (float)$digits;
}
return pack('N', $result);
}
/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" OR "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}
if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
if ($type === 'CONNCLOSE') {
if ($cb !== null) {
$cb($this);
return true;
}
}
// Binary
$type = $this->get_frame_type($type);
if (($type & self::BINARY) === self::BINARY) {
$n = strlen($data);
$len = '';
$pos = 0;
char:
++$pos;
$c = $n >> 0 & 0x7F;
$n >>= 7;
if ($pos !== 1) {
$c += 0x80;
}
if ($c !== 0x80) {
$len = chr($c) . $len;
goto char;
};
$this->write(chr(self::BINARY) . $len . $data);
}
// String
else {
$this->write(chr(self::STRING) . $data . "\xFF");
}
if ($cb !== null) {
$cb();
}
return true;
}
/**
* Called when new data received
* @return void
*/
public function on_read() {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$hdr = $this->look(10);
$frametype = ord(substr($hdr, 0, 1));
if (($frametype & 0x80) === 0x80) {
$len = 0;
$i = 0;
do {
if ($buflen < $i + 1) {
return;
}
$b = ord(substr($hdr, ++$i, 1));
$n = $b & 0x7F;
$len *= 0x80;
$len += $n;
} while ($b > 0x80);
if (self::maxAllowedPacket <= $len) {
// Too big packet
$this->close();
return;
}
if ($buflen < $len + $i + 1) {
// not enough data yet
return;
}
$this->drain($i + 1);
$this->on_frame($this->read($len), $frametype);
} else {
if (($p = $this->search("\xFF")) !== false) {
if (self::maxAllowedPacket <= $p - 1) {
// Too big packet
$this->close();
return;
}
$this->drain(1);
$data = $this->read($p);
$this->drain(1);
$this->on_frame($data, 'STRING');
} else {
if (self::maxAllowedPacket < $buflen - 1) {
// Too big packet
$this->close();
return;
}
}
}
}
}
}
Сразу хочу отметить, что протокол VE не тестировал — понятия не имею кто его использует. Но добросовестно сконвертировал и урезал код из PhpDeamon.
Протокол V13 используют все нормальные браузеры (FireFox, Opera, Chrome, Яндекс). Даже IE его использует (извините, после IE6 — для меня IE никогда не будет «браузером», даже команда разработчик IE заявляли, что это «не браузер, а тонкий клиент»). Протокол V0 использует браузер «Сафари».
Вместо заключения
Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon». В итоге сокет сервер, приведенный тут, совместим со всеми современными браузерами. Работает без утечек памяти и годится для использования в высоконагруженных системах.
Обновление:
- Комментарий автора статьи, на которую я постоянно ссылаюсь в тексте: habrahabr.ru/post/301822/#comment_9634636
- Метод read_lint() содержит ошибку — что мы читаем данные тела http запроса, хотя должны были читать только заголовки.
- В основном теле цикла — не корректное использование указателей при переключении протокола.
- По просьбам трудящихся вот ссылка на gitbub github.com/anlide/websocket тут код исправленный и ещё ping-pong доработанный, осталось ещё причину закрытия сокета фиксировать и заменить select на что-то — и будет замечательная смесь лучших серверных решений по websocket.
Комментарии (66)
hockfan
26.05.2016 14:07+1Я, видимо, еще из тех стариков извращенцев у кого очень сложный протокол WebSocket'а реализован на C/C++. Спасибо работодателю за терпение!
GamePad64
26.05.2016 16:56Вот, кстати, хорошая C++-библиотечка для реализации WebSockets: WebSocket++ Основана на boost.asio, умеет многое, и очень расширяемая.
Ещё в Qt есть QWebSocket. Для простеньких приложений самое то, но не полностью поддерживает RFC6455 (например, не умеет Subprotocol).
Videoman
26.05.2016 23:44Да, нет. Мы вот тоже реализовали свой велосипед на С++. Правда, даже не поняли что он сложный. Гоняем по нему видео для MSE. Отлично работает на всех клиентах, кроме IOS-based.
Fesor
26.05.2016 23:55Отлично работает на всех клиентах, кроме IOS-based.
что ж вы так отрубаете платежеспособных кастомеров?)
Справедливости ради речь у вас скорее идет не о web-сокетах а о старых добрых tcp сокетах.
Videoman
27.05.2016 01:10Ну, к счастью, клиентов на IOS у нас пока нет. Мы используем Web клиент как дополнительный тонкий клиент к основному, без процедуры установки. Все работает только во внутри-корпоративных сетях, где очень мало мобильных клиентов. Не совсем понял про «старые» советы. Веб-сокеты, действительно, реализованы поверх обычных tcp, но сверху там еще есть: handshake, отдельные сообщения с заголовками и т.д. Что делать с IOS пока не понятно, так как MSE там выпилено, даже в Хроме.
anlide
27.05.2016 01:18В этой ветке вы об этом прямо не говорите, но похоже вы поймали ту же проблему из-за которой была написана эта статья.
Я постоянно ссылаюсь на https://habrahabr.ru/post/209864/ что он хорош и прост, но safary с ним не работает. То есть IOS не работает. А чтобы работало под IOS — надо посложнее код написать. Собственно об этом «посложнее» и идёт вся эта статья.Videoman
27.05.2016 11:27Не, я немного не об этом. Веб-сокеты, как раз, работают на всех, более или менее, современных платформах. Не работает MSE (Media Source Extensions). В моем случае, к счастью, нет необходимости поддерживать старые браузеры (до принятия WebSockets, как стандарта).
bat
26.05.2016 14:08+4Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon».
Ну так оформите нормально, опубликуйте код и документацию, поддерживайте, тогда вам скажут спасибо.
А так — мыши и кактусanlide
26.05.2016 15:25-1Я вроде подчеркнул не раз в тексте, что код максимально упрощён с ООП до процедурного стиля. Соответственно в тупую использовать этот код целиком нельзя. Но в процедурном стиле — проще всего изложить что происходит в коде и читателю проще выдернуть нужные куски и вставить себе в код.
Для стиля «давайте сделаем побыстрее» — вся статья неприемлема, то есть надо использовать PhpDeamon, socketo.me или ещё какое-то готовое решение. А для стиля «давайте сделаем хорошо» — статья то, что надо. То есть читатель пишет свой сервис, у него свой основной цикл stream_select и ему в таком виде намного проще надёргать куски кода себе, чем дёргать куски кода из готового решения.
Если речь идёт про «а шо такое websocket» — то это статья не про это.
Хотя я собираюсь опубликовать на github и запостить тут ссылку туда. И есть небольшая ошибка в методе «read_line» и небольшая опечатка в указателях основного цикла — разумеется надо как-то опубликовать исправленный код.bat
27.05.2016 06:23-1Про оформление и поддержку был скорее сарказм…
Зачем придумывать и героически преодолевать трудности? Да на php можно написать http-сервер, websocket-сервер. Но зачем? Когда есть готовые решения? пусть и на других платформах. Например, nodejs — event loop и http из коробки, реализации ws на выбор, и все это отлично работает + язык знакомый для вебразработчика. Или, например, go — тоже все есть, да, может язык и незнакомый, но эффективнее по по процессору и памяти.
akeinhell
26.05.2016 14:08Не рассматривали уже готовые решения?
К примеру очень хороший вариант socketo.me/docsanlide
26.05.2016 15:14Я рассматривал только PhpDeamon на этапе проектирования.
https://habrahabr.ru/post/301822/#comment_9627298 тут написал почему отказался от его использования.
socketo.me обязательно просмотрю — может ещё чего-то важное найду.
Пока предварительная информация — но похоже в коде PhpDeamon / WebSocket / V13 есть ошибка, код некорректно реагирует на ping frame. Из-за чего FF@Linux разрывает websocket соединение. Подчёркиваю, что это требует перепроверки.
Обращаю внимание, что поиск и исправление подобной ошибки в сторонней библиотеке существенно сложнее чем в своей.
SerafimArts
26.05.2016 19:35+7Я чего-то не понимаю. Но в чём причина писать такое, вместо того, что бы написать одну строчку
composer require cboden/ratchet
? Это всё, что надо сделать, чтобы включить полную (т.е. включая устаревшие протоколы) поддержку сокетов к себе в проект.
P.S. По-моему с приходом Ratchet — PhpDaemon ну не то чтобы умер, но в предсмертном состоянии. Код первого на порядок качественнее, имхо.
Fesor
26.05.2016 20:59-1Поправка, с появлением решений вроде reactphp phpdaemon в принципе стал ненужным. Да и не поддерживается он уже довольно давно.
evnuh
26.05.2016 21:08-3Почему PHP? Если вам нужно неумирающее приложение с сохранением состояния (сервер игры), многопоточная математика и вебсокеты — то PHP — это худшее решение для вашей задачи.
SerafimArts
26.05.2016 21:09+4Не хочу показаться излишне грубым, но вы немного отстали от жизни =)
UPD: А вообще хочется аргументации почему вы так считаете? PHP7 кушает раза в 4-5 меньше оперативки, нежели nodejs, многопоточность и асинхроные плюшки есть, GC (по крайней мере в той же 7ке) наверное один из лучших, что есть на рынке. И т.д. А с приходом JIT, который сейчас в девел веточке...
evnuh
26.05.2016 21:50+1Ну, многопоточность там в зачаточном состоянии. Я так понимаю, почти всё в PHP не thread-safe, то есть локи и сериализация доступа? Как с дсотупом к коллекциям? Лочить всю коллекцию? Могу ошибаться, поправьте. Какая модель памяти? О lock-free я так понимаю тоже можно только мечтать? А какой GC в PHP7? RC? Stop-the-world?
Я сторонник производительности, и для меня писать реалтаймовый сервер игры на скриптовом языке — уже сомнительная затея.SerafimArts
26.05.2016 23:11+1О, вот тут вы меня в тупик поставили. Я просто не представляю как устроены pthreads, т.к. работал с ними исключительно сквозь вышеупомянутый react.
На счёт GC — схема простая, у пыха нет явного подсчёта (т.е. вызов gc_collect_cycles только руками) ссылок, как вследствие — фризов освобождения оной — тоже, просто при создании нового объекта под него выделяется уже неиспользуемая память. В результате получаем да, небольшие задержки при создании объектов и постоянно растущую память, когда объекты не высвобождаются, с другой стороны — вполне эффективный рантайм без явных проседаний от очистки памяти. Лично мне не удавалось вообще заспамить оперативку в рантайме, при явном отсутсвии "заботы об удалении ненужного", скажем так. Думаю, что более подробную информацию по этому вопросу может предоставить Дмитрий Стогов, я просто не компетентен во внутренностях, но на практике — оно быстро работает, без фризов и "левых" утечек, на которые я напарывался при работе с джавой.
Ну когда действительно важна производительность — да, плюсы вне конкуренции. Это вообще другой уровень и другие требования. Но сабжевая игра написана для ВК, так что и требования соответсвующие.
Fesor
26.05.2016 23:28да, плюсы вне конкуренции
А как же Си или Rust? По производительности они выигрывают. Последний в принципе неплох и в плане скорости разработки.
SerafimArts
26.05.2016 23:36В моём случае название языка было использовано в качестве нарицательного. Имел ввиду любой язык, который предоставляет прямой доступ к памяти и стеку, и не делает львиную долю работы по её управлению за того, кто использует какой-либо язык. Короче любой +- низкоуровневый. Да даже Go можно, тоже вполне себе быстрая штука.
dmalchenko
27.05.2016 11:44У меня вопрос, может кто знает какие крупные проекты используют WebSocket и long-polling? Знаю, что ВК использует и то и другое. WS хорош, но сложен в реализации. Хотелось бы узнать как long-polling работает с высокими нагрузками? (при хорошо настроенном сервере)
SerafimArts
27.05.2016 12:16ВК использует сокеты? Где? о_0 Для сообщенек и новостей long-polling only
dmalchenko
27.05.2016 12:22спасибо, long-polling видел в ВК в живую, про WS нашел на просторах гугла. Возможно знаете про другие крупные проекты? Одноклассники тоже сидят на long-polling.
SerafimArts
27.05.2016 13:39Из того что знаю: Slack, Gitter, GoodGame (чатик), Twitch (чатик) на ws. Возможно и чатик Youtube тоже (это вполне было бы логично), но не уверен.
dmalchenko
27.05.2016 14:07Спасибо! Может посоветуете что использовать при больших нагрузках (например одновременно 5-10к). Как относитесь к Server-Sent Events?
SerafimArts
27.05.2016 16:16Опыта с реально высокой нагрузкой + реалтаймом не было. Так что тут я не советчик.
Fesor
29.05.2016 00:06Насколько я помню вконтактики да фэйсбуки используют long polling просто потому что многие сидят через прокси (с работы например) которые режут весь не http траффик. Так же long polling можно считать stateless и это намного удобнее в плане распределения нагрузок.
SerafimArts
29.05.2016 04:52<irony>После беглого просмотра фконтактовских исходников kphp (https://github.com/vk-com/kphp-kdb) — я предполагаю, что у них, в частности, немного иная причина использования лонгполлинга.</irony>
morozovsk
30.05.2016 15:50anlide, в этой статье вы неоднократно даёте ссылку на мою статью Делаем вебсокеты на PHP с нуля, но это только первая статья из трёх на эту тему.
Вторая часть статьи: IPC. Межпроцессное взаимодействие
Третья часть статьи: От чата до игры: Battle City
Все они были написаны в начале 2014 года, а по их результатам создана библиотека и опубликована на гитхабе, в которой уже было учтено всё, что вы описываете ещё в начале 2014 года.
На всякий случай я попросил друга проверить работу моей библиотеки в Safari и она работает. В той же википедии пишут, что вебсокеты версии 13 (теперь эта версия закреплена в RFC 6455) поддерживаются в Apple Safari начиная с версии 6, которая вышла 2012 году, за два года до написания моих статей.
Ссылка на библиотеку также есть в каждой статье. Библиотека поддерживает только протокол вебсокетов и не поддерживает long polling, т.к. подавляющее большинство браузеров поддерживает вебсокеты, о чём я писал в третьей статье и приводил цифры (а это было напомню ещё в начале 2014 года, а сейчас 2016 год на дворе).
Также на странице библиотеки приведены ссылки на демки игр, которые физически не могли бы работать, если бы обладали описанными вами изъянами.
В вашей реализации вы используете socket_select, что не даёт возможности обслуживать более тысячи одновременных соединений без перекомпиляции php. В моей библиотеке также поддерживаются расширения pecl/event и pecl/libevent, что устраняет эти ограничения.
И да, моя библиотека протестирована на php седьмой версии и используется в продакшене уже несколько лет и не только мной, а демки висели без перезапуска больше года (до момента смены мною хостинга) и утечек памяти за этот период зафиксировано не было.
Сложилось впечатление, что статья была написана 2 года назад, а опубликована сейчас. Прошу добавить мой комментарий в конец статьи в качестве опровержения, чтобы не вводить читателей в заблуждение.anlide
30.05.2016 17:43+1Спасибо за разъяснения, мне много стало понятнее.
22 янв 2016 я подал очередную заявку на публикацию игры в vk. По логам вижу: зашёл админ и сразу закрыл игру. Вот его useragent «Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36». У меня windows. Скачал последнюю версию safari, что нашёл в инете на тот момент: 5.1.7 версии.
Как теперь видно — админ vk не поймал ошибку соединения, но ему что-то другое не правилось, а я поймал такую ошибку. В итоге сделал неправильный вывод (что websocket под safari не сработал) и неправильные дальнейшие мои действия.
Должен признать, что это мой огромный провал.
В статье вы используете stream_select, а в коде на github используете EventBuffer. Но впрочем я уже созрел написать на тему других проблем, с которыми я столкнулся (правильное использование cocos js и вынесение задач в отдельный процесс). Но… а что за ограничения socket_select/stream_select на количество соединений?morozovsk
30.05.2016 19:06Если в socket_select передавать дескрипторы с номером больше 1024 (т.е. открыто более 1024 соединений), то выдаётся такая ошибка:
PHP Warning: socket_select(): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to 1024, but you have descriptors numbered at least as high as 1024.
И судя по всему указанный вами user-agent принадлежит не сафари как могло показаться на первый взгляд, а хрому, что вполне логично — тестировать работоспособность приложения в самом используемом браузере.
http://www.useragentstring.com/Chrome35.0.1916.47_id_19788.php
https://github.com/gorhill/uMatrix/wiki/Latest-user-agent-strings
http://www.useragentstring.com/pages/Chrome/
http://www.webapps-online.com/online-tools/user-agent-strings/dv/operatingsystem51847/os-xanlide
02.06.2016 03:00+1Большое спасибо за замечание. Долго думал, что с этим делать. На собственном хостинге увеличивать количество файловых дейскрипторов для любого приложения можно без проблем. Поэтому видимо надо просто следить за этой проблемой и должный report об этой ошибке сделать. И под нагрузкой socket_select использовать можно будет с таким подходом. Да?
Игра была запущена по схеме «тихий старт». То есть нигде не постится новость о новой игре, а в недрах раздела «все игры» — собственно и появляется игра. На данный момент посмотрело игру ~1750 человек. Google analytic пишет, что safary с версией 5 было аж 1 пользователь за всё время (ниже пятой версии пользователей не было). Очевидно это был мой браузер. И проблемы со старым протоколом на самом деле неактуальны.morozovsk
02.06.2016 14:39Эта ошибка пользователя, а не баг в php (не нужно никуда писать репорты), именно поэтому генерируется PHP Warning, а не падает сам php. Эта ошибка сообщает пользователю, что socket_select нельзя использовать с дескрипторами с номером больше 1024. Это сделано потому что php-функция socket_select делает системный вызов select, а он работает используя (насколько помню) системный вызов poll, который не очень эффективный, особенно на большом количестве соединений. В системе есть более эффективные методы обработки — epoll, kqueue, kevent и т.д.
Конечно можно перекомпилировать php с увеличенной константой FD_SETSIZE, но тогда будет неэффективно использоваться процессор. Гораздо лучше использовать расширение pecl/event (которое уже обновили для его работы с php7).
pecl/event — это обёртка над системной библиотекой libevent (которую использует memcached, chrome, tor и т.д.)
В своей библиотеке я добавил поддержку pecl/event и возможность переключаться на неё с socket_select и обратно. Это помогает новичкам запустить свой вебсокет-сервер без установки дополнительных модулей, а когда нагрузка увеличится, то донастроить сервер и переключиться на более эффективный механизм.anlide
03.06.2016 03:37В вашем коде я вижу 3 реализации WebSocket — socket_select, event, libevent. Чем отличаются event и libevent? Какой лучше выбрать? Почему?
На первый взгляд libevent больше нравится — потому что PhpStorm о нём знает, а про event — нет. :)
akubintsev
02.06.2016 17:24Я так и не понял в чем проблема с Сафари.
Делал чатик с помощью Ratchet, замечательно работает под этим браузером.anlide
02.06.2016 17:50Уже разобрались. Пол года назад, последняя версия safary под windows была пятёрка. Под ней нужен старый протокол. Кстати Ratchet его поддерживает.
Но кстати я изучил код Ratchet и он не поддерживает не шифрованные (masked) данные. А в протоколе указано как поддерживать это (и PhpDeamon и код приведенный тут умеет это). Может ещё чего всплывёт по ходу ковыряния его исходника.
Ну вобщем я статью обновлю — как все вылезшие подробности прояснятся.akubintsev
02.06.2016 18:45На мой взгляд это не такая серьезная проблема, хотя реализация не помешала бы. Достаточно использовать WSS и тут, правда, без nginx никак, а также настроить фильтрацию origin domain.
Mannaward91
02.06.2016 20:38Метод read_lint() содержит ошибку — что мы читаем данные тела http запроса, хотя должны были читать только заголовки.
В основном теле цикла — не корректное использование указателей при переключении протокола.
Если кто-нить исправил эти ошибочки, скиньте кусочки кода пожалуйста.
anlide
04.06.2016 04:22Запостил на github https://github.com/anlide/websocket
Ещё надо будет с двумя вопросами разобраться — это socket_select под нагрузкой и причина закрытия сокета. Ещё расставить кучу комментариев по коду и вопрос можно будет закрывать.Fesor
04.06.2016 10:29+1socket_select под нагрузкой
А в чем именно вопрос? socket_select под нагрузкой нормально справляется. Не так как libev например (его можно заюзать) но норм. Заменять бы я рекомендовал просто на готовую библиотеку для реализации event loop. Более оптимального решения с точки зрения нагрузок для задач в духе web-сокетов как бы и нет.
Ещё расставить кучу комментариев по коду и вопрос можно будет закрывать.
Лучше пользуйтесь осмысленными названиями переменных и методов, а так же выносите части в приватные методы, используйте константы и все такое и комментарии будут просто не нужны.
anlide
04.06.2016 14:44https://habrahabr.ru/post/301822/#comment_9635078 вот жи человек говорит, что 1000 соединений и тю-тю, надо пересобирать php. И при этом он ещё и медленный. Так где же правда?
На тему комментариев — я с вами согласен. Наверное стоит поудалять все комментарии, написать свои только в сложных местах, в большей части кода сделать говорящие названия переменных.Fesor
04.06.2016 19:30Ну если у вас более 1000 соединений, тут проще использовать libev или libevent. Экстеншены ставить из пекла не особо то сложно, в обращении они не намного сложнее stream_select, так что...
А по поводу пересборки — это не такая большая проблема на самом деле) Особенно если использовать Docker — один раз собрал и хорошо.
morozovsk
05.06.2016 01:01libevent — не вариант, об этом я уже выше писал https://habrahabr.ru/post/301822/#comment_9641462
по поводу пересборки — это всё таки проблема. Совсем недавно вышла php 7.0.7, а до неё 7.0.6 и 7.0.5 и т.д. и все они — «security release», так что перекомпилировать каждый раз php вместо того чтобы обновить из пакета — это лишние действия, а если не обновлять то будет решето.
За libev спасибо, не знал, хотя его автор сделал также и event, который я использовал ранее.
omatosan
А почему отказались от использования PhpDeamon?
anlide
Потому что в этой задаче необходимо выполнять очень много математики под нагрузкой. Соответственно необходимо было сделать многопоточность в рамках php (напоминаю, что это делается только созданием новых потоков). А это в свою очередь требует использовать stream_select общий для дочерних процессов, и для websocket, и для админки. И этот подход мне хорошо знаком.
С другой стороны PhpDeamon в основе использует EventBuffer — с которым я совсем незнаком и грамотное использование этого дела под нагрузкой — это неизвестно сколько времени и нервов и потерянных пользователей.
Fesor
pthreads не рассматривали? Ну мол что бы вынести вычисления в отдельный пул тредов и разгрузить воркер обработки запросов. Внутри потоков завести очереди, и разруливать кто что делает. Можно обойтись без каких-либо локов и т.д. и получить максимальную утилизацию CPU.
тут вы подразумеваете именно стримы, потоки ввода/вывода. Я раза 3 перечитал ваш комментарий что бы понять к чему тут потоки, дочерние и stream_select.
Ну то есть в приведенном вами коде у нас один процесс, один поток, и неблокируемые потоки ввода/вывода. Никакой многопоточности. Как и дочерних процессов собственно.
anlide
pthreads к сожалению не рассматривал. Отстал от жизни, изучу это. Бегло осмотрел — похоже это то, что мне было надо, а я по сути написал и оттестировал самодельный pthreads на чистом php.
«напоминаю, что это делается только созданием новых потоков» очень извиняюсь, опечатался.
«напоминаю, что это делается только созданием новых процессов» я хотел сказать.
В приведенном коде да, stream_select работает только с WebSocket. Можно наверное больше кода игрового сервера привести «аналог pthreads» — оно кому-то интересно?
Игровой сервер используется как сервис linux. То есть "/etc/init.d/game restart". Соответственно игровой сервер корректно реагирует на события из ОСи (выключение, перезагрузка, перезапуск сервиса). Работает мастер-процесс и несколько дочерних процессов (по количеству процессоров + 1 для очень долгих и низко-приоритетных задач). Приведенный в статье код — это кусок мастера.
Fesor
Так, давайте все же проясним. Вот способы работы с вводом/выводом в порядке эффективности (последний — самый эффективный):
По сути вы последнее и реализовали, правда есть более эффективные реализации (reactphp + libev/libuv) но это уже мелочи.
То есть для web-sockets как раз таки ваш подход очень хорошо подходит. Просто что бы вычисления не стопорили обработку запросов — можно было это дело мувнуть в отдельный трэд. Тогда и накладных расходов на взаимодейтсвие было бы меньше и работало оно независимо.
anlide
Да «event loop» наверное можно назвать. Разумеется это всё надо, чтобы обработка запросов не стопорила общение клиентов с сервером. Темболее в данном проекте некоторые рядовые запросы могут обрабатываться несколько секунд.
reactphp websocket — он же http://socketo.me/. Уже советовали посмотреть, посмотрю чем он лучше. Кроме того, что они поймали и пофиксили кучу багов — есть инфа чем он лучше, чем самодельный event loop?
Fesor
1) проверенный в бою
2) не нужно поддерживать самому
3) есть готовый адаптер для libev/libuv. К сожалению тот же libuv имел бы для вас смысл если бы помимо парсинга http он предоставлял так же обработку websockets. Но для ребят, которые пишут на неумираемом PHP он неплохо облегчает жизнь.
anlide
Читаю вот код из «socketo.me» — там ребята тоже очень не любят Сафари :)
Вот камменты в протоколе для Сафари:
Ща внимательно всё просмотрю, может тут реализован последний протокол так, что нет ошибки о которой я писал «firefox@linux ping frame» у PhpDeamon.
Кроме того меня очень радует, что названия протоколов нормальные:
anlide
Вот рассмотрел повнимательнее. Попробовал. Воодушевился, собрался переписывать всё с использованием потоков. Но…
В документации и примерах нет информации как ворочать большие объёмы данных между потоками. Или я не нашёл?
Приведенный код не работает — то есть выводит «true», хотя я ожидаю «false».
Мне очень хотелось бы, чтобы переменная $var не дублировалась, не синхронизировалась, а именно была бы в родительском потоке. Ну вобщем любым способом избежать дублирования данных в физической памяти. Это возможно в pthreads или нет?
Borro
Посмотрите на этот пример. Так же есть хорошая статья на тему.
anlide
Я взял этот пример и чуть подправил его. И попробовал в разном месте по разному расставить memory_get_usage.
И выяснилось, что если мы записываем ответ getStorage в локальную переменную, например $data — то расхода памяти нет. Как будто ссылка передалась.
Если записать в $this->data — то данные полностью копируются.
Попытка использовать "&" приводит к крашу CLI.
Возвращаясь к $data я попробовал перезаписать данные пустым массивом — не получается. То есть локальная переменная становится пустым массивом, а в Connect::$storage продолжает лежать большой кусок данных.
Вывод какой — если используется Pool, то конкретные задачи могут получить данные только для чтения из Worker без потери памяти. Запись в Worker невозможно.
Вывод правильный?
Borro
Немного не понял.
Запоминаем в треды, потом используем в основном потоке:
Выдает:
Start
Work result: 7905944565329289890
Work result: 5108499292478502335
Work result: 5714338237394927753
Work result: 7690405846878811114
Work result: 8972815893492806475
Work result: 7717729732644902772
Work result: 3482825673580680883
Work result: 5913335648957882133
Work result: 7176745494439780246
Work result: 3082331963992595859
End at 3.005893945694
Попробовал изменить ваш код. В каждый воркер можно писать:
Вывод:
int(1)
int(1)
int(1)
int(1)
int(2)
int(2)
anlide
В первом примере данные хранятся в самих потоках, там же запись в них. Далее чтение этих данных из основного потока, я так понимаю без физического копирования данных.
Во втором примере данные хранятся в экземплярах класса Connect и конкретная задача их успешно читает, но это чтение данных внутри одного потока. А запись осуществляется дёрганьем метода, а не напрямую.
Но я собственно на такое чтение данных и не жаловался. Как мне в дочернем потоке почитать данные из родительского потока не дублируя при этом данные в физической памяти? Как мне из дочернего потока изменить данные в родительском потоке? Без этого для моей задачи использовать pthreads бессмысленно. И если погуглить «c++ многопоточность память» то видно, что в C++ таких проблем нет.
К чему этот разговор вообще веду. Вот у меня есть реализованый механизм распределения задач в несколько процессов, которые общаются через сокеты. Общаются посылая сигналы друг-другу. Это собственно первая проблема — что общаться они должны только сигналами. Это очень неудобно и громоздко. Большие куски данных хранятся там где собственно используются (например данные по конкретному игроку, а они довольно объёмные), если другому процессу нужны данные из этого процесса — то он их тянет из БД или процесса, поработает с ними и удаляет из своей памяти. Это вторая проблема — хотелось жеж бы просто по id пользователя обратится куда-то в памяти, получить нужные данные и всё, не дублируя объёмные данные, не реализуя сигналы. Кроме того, есть большой массив констант типа тексты заданий, куча настроек игрового процесса и т.д. — это тоже желательно не дублировать, один экземпляр на процесс.
Так вот я надеялся, что pthread решат эти две проблемы. Но похоже он не решает ни того ни другого и добавляют ещё одну. Я надеялся, что у меня будет одна большая переменная с текстами-настройками игры, будет один большой массив с игроками. И потоки будут просто получать задачи и указатели на память и работать над ними. Не дублируя данные. Код был бы маленький и очень простой. Память использовалась бы максимально эффективно. Процессоры при необходимости загружались бы на максимум. Новые фичи писать в такой системе было бы легко.
Но что же это получается с pthreads — поток должен к себе полностью скопировать всё, обработать, изменить и отправить обратно? Опять сигналы вместо прямого обращения к данным? Далее по конкретному игроку — пока он онлайн, для него формируется большой кэш данных, который ещё и меняется постоянно, его тоже надо весь скопировать в поток, а потом записать обратно? И бонусная проблема — тексты-настройки игры тоже надо копировать целиком в поток?
Я надеюсь что ошибаюсь. Или нет?
Borro
Понял. Тогда может быть пример с каналом вам подойдет? Вообще в примерах много интересного можно найти.
Fesor
Разработчики pthreads специально сделали это ограничение, каждый поток со своим стораджем для переменных. Раньше можно было "расшаривать" данные делая их глобальными, но эту возможность убрали. Теперь можно только пересылать данные между потоками. И это правильно и сильно уменьшает риск выстрела в ногу.
SerafimArts
Кстати, раз уж заговорили о pthreads — не сталкивался никогда с тем, что оно не позволяет передавать между потоками сложные структуры данных, вроде объектов (включая инстансы \Closure)?
У меня такое ощущение сложилось, что оно при передаче данных между потоками — сериализует их, а потом распаковывает, отсюда и косяк с замыканиями.
Fesor
Можно передавать только то что можно сериализовать. Сталкивался с этим пару лет назад когда пытался вынести доктрину в отдельный пул потоков для организации асинхронной работы… и проиграл поскольку сущности с проксями передать не выходило (там entity manager у каждой прокси есть).
именно так внутри и происходит и об этом есть в документации)
Fesor
PhpDeamon на сегодняшний день морально устаревшее решение.