10 месяцев назад я начал делать браузерную игрушку. Выбор пал на cocos js в качестве графики и websocket в качестве общения с сервером. Технология очень понравилась и я на ней организовал всё общение игры с сервером. Использовал для этого эту статью. Но, к сожалению, тот код, который приведен в той статье, нельзя использовать в продакшене. Как выяснилось, уровень проблемы даже не критический, а блокирующий. Всё настолько плохо, что мне пришлось переписывать всё общение с сервером с вебсокетов на longpooling. В итоге я оставил вариант «если у нас браузер не сафари, то использовать websocket, иначе longpolling» и ещё немного ветвления на эту тему.

Так что опыт использования вебсокет в продакшене накопился приличный. И вот недавно случилось событие, которое сподвигло меня написать первую статью на Хабре.

После того, как игрушка была опубликована в социальной сети, я поправил все найденные критические/блокирующие баги и начал приводить всё в порядок в спокойном режиме. Я хочу обратить внимание на то, что этот вот пример — это вообще единственный в интернете гайд, который содержит серверный код, который можно вставить себе в код и использовать его. Ну вот набрать в поисковике «php websocket server» — попробуйте что-то найти, что можно себе поставить.

Внезапно я перечитываю указанную выше статью и в самом начале обнаруживаю ссылки на «phpdaemon» и «ratchet». Ну думаю, давай в спокойном режиме посмотрю на код тамошний. В PhpDeamon в недрах обработки WebSocket соединения небольшое, но безумно важное ветвление на протоколы WebSocket. И там прямо написано для одного case «Safari5 and many non-browser clients». Сказать, что я офигел — это ничего не сказать. Перед глазами пронеслись несколько сотен часов, тонны нервотрёпки и страдания, которые поставили под вопрос вообще проект. Я не поверил, решил проверить.

В течении ~15 часов я вытянул из PhpDeamon минимальный код, связанный с WebSocket (который работает во всех браузерах последней версии, а сам серверный код может работать под высокой нагрузкой) и его постараюсь опубликовать с объяснениями. Чтобы другие люди не испытали те мучения, через которые мне пришлось пройти. Да, кусок кода получился не маленький, но извините: WebSocket он на клиентской части очень простой, а на стороне сервера всё довольно объёмно (скажем отдельное «спасибо» разработчикам Сафари). Также в связи с тем, что область применения WebSocket — это в первую очередь игры, важен вопрос неблокирующего использования серверного сокета — это бонусная сложность, которая никак здесь не рассматривается, хотя и очень важна.

Тестовое приложение я хотел написать без объектов, чтобы было понятнее. Но, к сожалению, такой подход в данном примере расплодит много повторяющегося кода, поэтому пришлось добавить 1 класс и 3 его наследника. Остальное всё без объектов.

Для начала клиентсная часть
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
  <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
  <title>WebSocket test page</title>
</head>
<body onload="create();">
<script type="text/javascript">
  function create() {
    // Example
    ws = new WebSocket('ws://'+document.domain+':8081/');
    ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';}
    ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';}
    ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';}
  }
</script>
<button onclick="create();">Create WebSocket</button>
<button onclick="ws.send('ping');">Send ping</button>
<button onclick="ws.close();">Close WebSocket</button>
<div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div>
</body>
</html>


В моей игре мне пришлось использовать 3 сокет сервера. Для websocket, для worker`ов и для longpooling. В игре очень много математики, поэтому надо было делать вёркеры и выдавать им задачи на вычисления. Так вот к чему это. Что stream_select для них всех должен быть общий, иначе будут лаги или безумное использование процессора. Это знание тоже было получено взамен кучи истраченных нервов.

Основной цикл сервиса
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
stream_set_blocking($master, false); // Относительно этой команды я не уверен, потому что мастер из сокетов читает только новые соединения, и для чтения используется "stream_socket_accept". Вариант, что весь сервис будет подвешен на несколько секунд из-за того, что клиент не торопится соединятся - категорически неприемлемо.
while (true) {
  $read = $sockets;
  $write = $except = array();
  if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
    var_dump('stream_select error');
    break;
    // Сделать выход из цикла, а не "die", потому что в продакшине скорей всего этот код будет выполняться как сервис и при команде "/etc/init.d/game restart" тут 100% будет этот case, так вот надо дать "pcntl" код нормально отработать и не мешать ему.
  }
  foreach ($read as $socket) {
    $index_socket = array_search($socket, $sockets);
    if ($index_socket == 0) {
      // Новое соединение
      continue;
    }
    // Тут будет обработка сообщений клиентов
  }
}


Соединение с новыми клиентами вполне себе стандартный код, но вот из-за того, что сокеты у нас в неблокирующем режиме, нужно написать кучу кода, который по кусочкам соберёт все входящие данные и, когда данных будет достаточно, обработает их, поймёт какой протокол надо использовать и переключится на использование этого протокола. Одна эта задача — уже гора кода, и в PhpDeamon нагородили много кода, который к WebSocket отношения не имеет (они же там 8 разных серверов умеют подымать). Удалось многое отрезать и упростить в этой теме. Оставил только то, что относится к WebSocket.

Файл урезанный <ws.php>
class ws {
  const MAX_BUFFER_SIZE = 1024 * 1024;

  protected $socket;

  /**
   * @var array _SERVER
   */
  public $server = [];

  protected $headers = [];

  protected $closed = false;
  protected $unparsed_data = '';
  private $current_header;
  private $unread_lines = array();

  protected $extensions = [];
  protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';

  /**
   * @var integer Current state
   */
  protected $state = 0; // stream state of the connection (application protocol level)

  /**
   * Alias of STATE_STANDBY
   */
  const STATE_ROOT = 0;

  /**
   * Standby state (default state)
   */
  const STATE_STANDBY = 0;

  /**
   * State: first line
   */
  const STATE_FIRSTLINE  = 1;

  /**
   * State: headers
   */
  const STATE_HEADERS    = 2;

  /**
   * State: content
   */
  const STATE_CONTENT    = 3;

  /**
   * State: prehandshake
   */
  const STATE_PREHANDSHAKE = 5;

  /**
   * State: handshaked
   */
  const STATE_HANDSHAKED = 6;

  public function get_state() {
    return $this->state;
  }

  public function closed() {
    return $this->closed;
  }

  protected function close() {
    if ($this->closed) return;
    var_dump('self close');
    fclose($this->socket);
    $this->closed = true;
  }
  public function __construct($socket) {
    stream_set_blocking($socket, false);
    $this->socket = $socket;
  }

  private function read_line() {
    $lines = explode(PHP_EOL, $this->unparsed_data);
    $last_line = $lines[count($lines)-1];
    unset($lines[count($lines) - 1]);
    foreach ($lines as $line) {
      $this->unread_lines[] = $line;
    }
    $this->unparsed_data = $last_line;
    if (count($this->unread_lines) != 0) {
      return array_shift($this->unread_lines);
    } else {
      return null;
    }
  }
  public function on_receive_data() {
    if ($this->closed) return;
    $data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE);
    if (is_string($data)) {
      $this->unparsed_data .= $data;
    }
  }
  /**
   * Called when new data received.
   * @return void
   */
  public function on_read() {
    if ($this->closed) return;
    if ($this->state === self::STATE_STANDBY) {
      $this->state = self::STATE_FIRSTLINE;
    }
    if ($this->state === self::STATE_FIRSTLINE) {
      if (!$this->http_read_first_line()) {
        return;
      }
      $this->state = self::STATE_HEADERS;
    }

    if ($this->state === self::STATE_HEADERS) {
      if (!$this->http_read_headers()) {
        return;
      }
      if (!$this->http_process_headers()) {
        $this->close();
        return;
      }
      $this->state = self::STATE_CONTENT;
    }
    if ($this->state === self::STATE_CONTENT) {
      $this->state = self::STATE_PREHANDSHAKE;
    }
  }
  /**
   * Read first line of HTTP request
   * @return boolean|null Success
   */
  protected function http_read_first_line() {
    if (($l = $this->read_line()) === null) {
      return null;
    }
    $e = explode(' ', $l);
    $u = isset($e[1]) ? parse_url($e[1]) : false;
    if ($u === false) {
      $this->bad_request();
      return false;
    }
    if (!isset($u['path'])) {
      $u['path'] = null;
    }
    if (isset($u['host'])) {
      $this->server['HTTP_HOST'] = $u['host'];
    }
    $srv                       = & $this->server;
    $srv['REQUEST_METHOD']     = $e[0];
    $srv['REQUEST_TIME']       = time();
    $srv['REQUEST_TIME_FLOAT'] = microtime(true);
    $srv['REQUEST_URI']        = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
    $srv['DOCUMENT_URI']       = $u['path'];
    $srv['PHP_SELF']           = $u['path'];
    $srv['QUERY_STRING']       = isset($u['query']) ? $u['query'] : null;
    $srv['SCRIPT_NAME']        = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
    $srv['SERVER_PROTOCOL']    = isset($e[2]) ? $e[2] : 'HTTP/1.1';
    $srv['REMOTE_ADDR']        = null;
    $srv['REMOTE_PORT']        = null;
    return true;
  }
  /**
   * Read headers line-by-line
   * @return boolean|null Success
   */
  protected function http_read_headers() {
    while (($l = $this->read_line()) !== null) {
      if ($l === '') {
        return true;
      }
      $e = explode(': ', $l);
      if (isset($e[1])) {
        $this->current_header                = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
        $this->server[$this->current_header] = $e[1];
      } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
        // multiline header continued
        $this->server[$this->current_header] .= $e[0];
      } else {
        // whatever client speaks is not HTTP anymore
        $this->bad_request();
        return false;
      }
    }
  }
  /**
   * Process headers
   * @return bool
   */
  protected function http_process_headers() {
    $this->state = self::STATE_PREHANDSHAKE;
    if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
      $str              = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
      $str              = preg_replace($this->extensionsCleanRegex, '', $str);
      $this->extensions = explode(', ', $str);
    }
    if (!isset($this->server['HTTP_CONNECTION'])
      || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
      || !isset($this->server['HTTP_UPGRADE'])
      || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
    ) {
      $this->close();
      return false;
    }
    if (isset($this->server['HTTP_COOKIE'])) {
      self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
    }
    if (isset($this->server['QUERY_STRING'])) {
      self::parse_str($this->server['QUERY_STRING'], $this->get);
    }
    // ----------------------------------------------------------
    // Protocol discovery, based on HTTP headers...
    // ----------------------------------------------------------
    if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
      if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
        $this->switch_to_protocol('v13');
      } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
        $this->switch_to_protocol('v13');
      } else {
        error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
        $this->close();
        return false;
      }
    } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
      $this->switch_to_protocol('ve');
    } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
      $this->switch_to_protocol('v0');
    }
    // ----------------------------------------------------------
    // End of protocol discovery
    // ----------------------------------------------------------
    return true;
  }
  private function switch_to_protocol($protocol) {
    $class = 'ws_'.$protocol;
    $this->new_instance = new $class($this->socket);
    $this->new_instance->state = $this->state;
    $this->new_instance->unparsed_data = $this->unparsed_data;
    $this->new_instance->server = $this->server;
  }
  /**
   * Send Bad request
   * @return void
   */
  public function bad_request() {
    $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
    $this->close();
  }
  /**
   * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
   * @param  string  $s      String to parse
   * @param  array   &$var   Reference to the resulting array
   * @param  boolean $header Header-style string
   * @return void
   */
  public static function parse_str($s, &$var, $header = false)
  {
    static $cb;
    if ($cb === null) {
      $cb = function ($m) {
        return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
      };
    }
    if ($header) {
      $s = strtr($s, self::$hvaltr);
    }
    if (
      (stripos($s, '%u') !== false)
      && preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m)
    ) {
      $s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s);
    }
    parse_str($s, $var);
  }
  /**
   * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
   * @param  string  $data Data to send
   * @return boolean       Success
   */
  public function write($data) {
    if ($this->closed) return false;
    return stream_socket_sendto($this->socket, $data) == 0;
  }
}


Смысл этого класса в таком урезанном виде — в конструкторе установить неблокирующий режим для соединения с клиентом. Далее в основном цикле, каждый раз, когда приходят данные — сразу их прочитать и положить (дополнить) в «unparsed_data» переменную (это метод on_receive_data). Важно понимать, что если мы выйдем за размеры MAX_BUFFER_SIZE, то вообще ничего страшного не случится. Можно в итоговом примере, что тут будет, поставить его значение, скажем, «5» и убедится, что всё по-прежнему работает. Просто данные из буфера на первом шаге будут проигнорированы — они ведь неполные будут, и со второго или пятого или сотого захода наберутся, наконец, все принятые данные и будут обработаны. При этом stream_select в основном цикле ждать не будет даже микросекунды, пока все данные не будут извлечены. Константу надо подобрать такую, чтобы 95% ожидаемых данных читались целиком.
Далее в основном цикле (после получения очередной порции данных) мы пробуем накопленные данные обработать (это метод on_read). В классе «ws» метод «on_read» состоит по сути из трёх шагов: «читаем первую строку и готовим переменные окружения», «читаем все заголовки», «обрабатываем все заголовки». Первые 2 пояснять не надо, но написаны они довольно объёмно потому, что мы в неблокирующем режиме и надо быть готовым к тому, что данные оборваны в любом месте. Обработка заголовков сначала проверяет формат запроса правильный или нет, а потом по заголовкам определяет протокол, по которому будет общаться с клиентом. В итоге должны дёрнуть метод switch_to_protocol. Этот метод внутри себя сформирует экземпляр класса «ws_<протокол>» и подготовит его для отдачи в основной цикл.

В основном цикле далее надо собственно проверить: а не надо ли подменить объект (если кто-то может предложить реализацию этого места лучше — всегда пожалуйста).

Далее в основном цикле надо поставить проверку: а не закрыт ли сокет. Если закрыт, то очистить память (об этом дальнее в следующем блоке).

Теперь полная версия файла <deamon.php>
require('ws.php');
require('ws_v0.php');
require('ws_v13.php');
require('ws_ve.php');
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
/**
 * @var ws[] $connections
 */
$connections = array();
stream_set_blocking($master, false);
/**
 * @param ws $connection
 * @param $data
 * @param $type
 */
$my_callback = function($connection, $data, $type) {
  var_dump('my ws data: ['.$data.'/'.$type.']');
  $connection->send_frame('test '.time());
};
while (true) {
  $read = $sockets;
  $write = $except = array();
  if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
    var_dump('stream_select error');
    break;
  }
  foreach ($read as $socket) {
    $index_socket = array_search($socket, $sockets);
    if ($index_socket == 0) {
      // Новое соединение
      if ($socket_new = stream_socket_accept($master, -1)) {
        $connection = new ws($socket_new, $my_callback);
        $sockets[] = $socket_new;
        $index_new_socket = array_search($socket_new, $sockets);
        $connections[$index_new_socket] = &$connection;
        $index_socket = $index_new_socket;
      } else {
        // Я так и не понял что в этом случае надо делать
        error_log('stream_socket_accept');
        var_dump('error stream_socket_accept');
        continue;
      }
    }
    $connection = &$connections[$index_socket];
    $connection->on_receive_data();
    $connection->on_read();
    if ($connection->get_state() == ws::STATE_PREHANDSHAKE) {
      $connection = $connection->get_new_instance();
      $connections[$index_socket] = &$connection;
      $connection->on_read();
    }
    if ($connection->closed()) {
      unset($sockets[$index_socket]);
      unset($connections[$index_socket]);
      unset($connection);
      var_dump('close '.$index_socket);
    }
  }
}


Тут добавлен "$my_callback" — это наш custom обработчик сообщений от клиента. Разумеется в продакшине можно завернуть это всё в объекты всякие, а тут чтобы было понятнее просто переменная-функция. О ней чуть позже подробнее.

Реализована обработка нового соединения и реализовано основное тело цикла, о котором я чуть выше писал.

Я хочу обратить внимание на код сервера тут. Что если прочтённые данные из сокета — это пустая строка (да, разумеется я видел там в update проверку на пустую строку), то сокет надо закрыть. Ох, я даже не знаю, сколько этот момет попил мне кровушки и скольких пользователей я потерял. Внезапнейшим образом Сафари отправляет пустую строку и считает это нормой, а этот код берёт и закрывает соединение пользователю. Яндекс-браузер иногда ведёт себя так же. Уж не знаю почему, но в этом случае для Сафари WebSocket остаётся зависшим, то есть он не закрывается, не открывается — просто висит и всё. Вы уже заметили, что я неравнодушен к этому волшебному браузеру? Мне вспоминается, как я верстал под IE6 — примерно такие же ощущения.

Теперь о том, зачем я использую array_search и синхронизирую массив $sockets и массив $connections. Дело в том, что stream_select жизненно необходим чистый массив $sockets и никак иначе. Но как-то надо же связать конкретный сокет из массива $sockets с объектом «ws». Перепробовал кучу вариантов — в итоге остановился на таком варианте, что есть 2 массива, которые постоянно синхронизированы по ключам. В одном массиве неоходимые чистые сокеты для stream_select, а во втором экземпляры класса «ws» или его наследники. Если кто-то может предложить это место лучше — предлагайте.

Ещё отдельно надо отметить случай, когда stream_socket_accept зафэйлился. Я так понимаю, теоретически это может быть только в том случае, если мастер сокет у нас в неблокирующем режиме, и приехало недостаточно данных для соединения клиента. Поэтому просто ничего не делаем.

Полная версия файла <ws.php>
class ws {
  private static $hvaltr = ['; ' => '&', ';' => '&', ' ' => '%20'];

  const maxAllowedPacket = 1024 * 1024 * 1024;
  const MAX_BUFFER_SIZE = 1024 * 1024;

  protected $socket;

  /**
   * @var array _SERVER
   */
  public $server = [];

  protected $on_frame_user = null;

  protected $handshaked = false;

  protected $headers = [];
  protected $headers_sent = false;

  protected $closed = false;
  protected $unparsed_data = '';
  private $current_header;
  private $unread_lines = array();
  /**
   * @var ws|null
   */
  private $new_instance = null;

  protected $extensions = [];
  protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';

  /**
   * @var integer Current state
   */
  protected $state = 0; // stream state of the connection (application protocol level)

  /**
   * Alias of STATE_STANDBY
   */
  const STATE_ROOT = 0;

  /**
   * Standby state (default state)
   */
  const STATE_STANDBY = 0;

  /**
   * State: first line
   */
  const STATE_FIRSTLINE  = 1;

  /**
   * State: headers
   */
  const STATE_HEADERS    = 2;

  /**
   * State: content
   */
  const STATE_CONTENT    = 3;

  /**
   * State: prehandshake
   */
  const STATE_PREHANDSHAKE = 5;

  /**
   * State: handshaked
   */
  const STATE_HANDSHAKED = 6;

  public function get_state() {
    return $this->state;
  }

  public function get_new_instance() {
    return $this->new_instance;
  }

  public function closed() {
    return $this->closed;
  }

  protected function close() {
    if ($this->closed) return;
    var_dump('self close');
    fclose($this->socket);
    $this->closed = true;
  }
  public function __construct($socket, $on_frame_user = null) {
    stream_set_blocking($socket, false);
    $this->socket = $socket;
    $this->on_frame_user = $on_frame_user;
  }

  private function read_line() {
    $lines = explode(PHP_EOL, $this->unparsed_data);
    $last_line = $lines[count($lines)-1];
    unset($lines[count($lines) - 1]);
    foreach ($lines as $line) {
      $this->unread_lines[] = $line;
    }
    $this->unparsed_data = $last_line;
    if (count($this->unread_lines) != 0) {
      return array_shift($this->unread_lines);
    } else {
      return null;
    }
  }
  public function on_receive_data() {
    if ($this->closed) return;
    $data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE);
    if (is_string($data)) {
      $this->unparsed_data .= $data;
    }
  }
  /**
   * Called when new data received.
   * @return void
   */
  public function on_read() {
    if ($this->closed) return;
    if ($this->state === self::STATE_STANDBY) {
      $this->state = self::STATE_FIRSTLINE;
    }
    if ($this->state === self::STATE_FIRSTLINE) {
      if (!$this->http_read_first_line()) {
        return;
      }
      $this->state = self::STATE_HEADERS;
    }

    if ($this->state === self::STATE_HEADERS) {
      if (!$this->http_read_headers()) {
        return;
      }
      if (!$this->http_process_headers()) {
        $this->close();
        return;
      }
      $this->state = self::STATE_CONTENT;
    }
    if ($this->state === self::STATE_CONTENT) {
      $this->state = self::STATE_PREHANDSHAKE;
    }
  }
  /**
   * Read first line of HTTP request
   * @return boolean|null Success
   */
  protected function http_read_first_line() {
    if (($l = $this->read_line()) === null) {
      return null;
    }
    $e = explode(' ', $l);
    $u = isset($e[1]) ? parse_url($e[1]) : false;
    if ($u === false) {
      $this->bad_request();
      return false;
    }
    if (!isset($u['path'])) {
      $u['path'] = null;
    }
    if (isset($u['host'])) {
      $this->server['HTTP_HOST'] = $u['host'];
    }
    $address = explode(':', stream_socket_get_name($this->socket, true)); //получаем адрес клиента
    $srv                       = & $this->server;
    $srv['REQUEST_METHOD']     = $e[0];
    $srv['REQUEST_TIME']       = time();
    $srv['REQUEST_TIME_FLOAT'] = microtime(true);
    $srv['REQUEST_URI']        = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
    $srv['DOCUMENT_URI']       = $u['path'];
    $srv['PHP_SELF']           = $u['path'];
    $srv['QUERY_STRING']       = isset($u['query']) ? $u['query'] : null;
    $srv['SCRIPT_NAME']        = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
    $srv['SERVER_PROTOCOL']    = isset($e[2]) ? $e[2] : 'HTTP/1.1';
    $srv['REMOTE_ADDR']        = $address[0];
    $srv['REMOTE_PORT']        = $address[1];
    return true;
  }
  /**
   * Read headers line-by-line
   * @return boolean|null Success
   */
  protected function http_read_headers() {
    while (($l = $this->read_line()) !== null) {
      if ($l === '') {
        return true;
      }
      $e = explode(': ', $l);
      if (isset($e[1])) {
        $this->current_header                = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
        $this->server[$this->current_header] = $e[1];
      } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
        // multiline header continued
        $this->server[$this->current_header] .= $e[0];
      } else {
        // whatever client speaks is not HTTP anymore
        $this->bad_request();
        return false;
      }
    }
  }
  /**
   * Process headers
   * @return bool
   */
  protected function http_process_headers() {
    $this->state = self::STATE_PREHANDSHAKE;
    if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
      $str              = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
      $str              = preg_replace($this->extensionsCleanRegex, '', $str);
      $this->extensions = explode(', ', $str);
    }
    if (!isset($this->server['HTTP_CONNECTION'])
      || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
      || !isset($this->server['HTTP_UPGRADE'])
      || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
    ) {
      $this->close();
      return false;
    }
    /*
    if (isset($this->server['HTTP_COOKIE'])) {
      self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
    }
    if (isset($this->server['QUERY_STRING'])) {
      self::parse_str($this->server['QUERY_STRING'], $this->get);
    }
    */
    // ----------------------------------------------------------
    // Protocol discovery, based on HTTP headers...
    // ----------------------------------------------------------
    if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
      if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
        $this->switch_to_protocol('v13');
      } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
        $this->switch_to_protocol('v13');
      } else {
        error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
        $this->close();
        return false;
      }
    } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
      $this->switch_to_protocol('ve');
    } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
      $this->switch_to_protocol('v0');
    }
    // ----------------------------------------------------------
    // End of protocol discovery
    // ----------------------------------------------------------
    return true;
  }
  private function switch_to_protocol($protocol) {
    $class = 'ws_'.$protocol;
    $this->new_instance = new $class($this->socket);
    $this->new_instance->state = $this->state;
    $this->new_instance->unparsed_data = $this->unparsed_data;
    $this->new_instance->server = $this->server;
    $this->new_instance->on_frame_user = $this->on_frame_user;
  }
  /**
   * Send Bad request
   * @return void
   */
  public function bad_request() {
    $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
    $this->close();
  }
  /**
   * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
   * @param  string  $s      String to parse
   * @param  array   &$var   Reference to the resulting array
   * @param  boolean $header Header-style string
   * @return void
   */
  public static function parse_str($s, &$var, $header = false) {
    static $cb;
    if ($cb === null) {
      $cb = function ($m) {
        return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
      };
    }
    if ($header) {
      $s = strtr($s, self::$hvaltr);
    }
    if (
      (stripos($s, '%u') !== false)
      && preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m)
    ) {
      $s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s);
    }
    parse_str($s, $var);
  }
  /**
   * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
   * @param  string  $data Data to send
   * @return boolean       Success
   */
  public function write($data) {
    if ($this->closed) return false;
    return stream_socket_sendto($this->socket, $data) == 0;
  }

  /**
   * Будте любезны в отнаследованном классе реализовать этот метод
   * @return bool
   */
  protected function send_handshake_reply() {
    return false;
  }
  /**
   * Called when we're going to handshake.
   * @return boolean               Handshake status
   */
  public function handshake() {
    $extra_headers = '';
    foreach ($this->headers as $k => $line) {
      if ($k !== 'STATUS') {
        $extra_headers .= $line . "\r\n";
      }
    }

    if (!$this->send_handshake_reply($extra_headers)) {
      error_log(get_class($this) . '::' . __METHOD__ . ' : Handshake protocol failure for client ""'); // $this->addr
      $this->close();
      return false;
    }

    $this->handshaked = true;
    $this->headers_sent = true;
    $this->state = static::STATE_HANDSHAKED;
    return true;
  }
  /**
   * Read from buffer without draining
   * @param integer $n Number of bytes to read
   * @param integer $o Offset
   * @return string|false
   */
  public function look($n, $o = 0) {
    if (strlen($this->unparsed_data) <= $o) {
      return '';
    }
    return substr($this->unparsed_data, $o, $n);
  }
  /**
   * Convert bytes into integer
   * @param  string  $str Bytes
   * @param  boolean $l   Little endian? Default is false
   * @return integer
   */
  public static function bytes2int($str, $l = false) {
    if ($l) {
      $str = strrev($str);
    }
    $dec = 0;
    $len = strlen($str);
    for ($i = 0; $i < $len; ++$i) {
      $dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1);
    }
    return $dec;
  }
  /**
   * Drains buffer
   * @param  integer $n Numbers of bytes to drain
   * @return boolean    Success
   */
  public function drain($n) {
    $ret = substr($this->unparsed_data, 0, $n);
    $this->unparsed_data = substr($this->unparsed_data, $n);
    return $ret;
  }
  /**
   * Read data from the connection's buffer
   * @param  integer      $n Max. number of bytes to read
   * @return string|false    Readed data
   */
  public function read($n) {
    if ($n <= 0) {
      return '';
    }
    $read = $this->drain($n);
    if ($read === '') {
      return false;
    }
    return $read;
  }
  /**
   * Reads all data from the connection's buffer
   * @return string Readed data
   */
  public function read_unlimited() {
    $ret = $this->unparsed_data;
    $this->unparsed_data = '';
    return $ret;
  }
  /**
   * Searches first occurence of the string in input buffer
   * @param  string  $what  Needle
   * @param  integer $start Offset start
   * @param  integer $end   Offset end
   * @return integer        Position
   */
  public function search($what, $start = 0, $end = -1) {
    return strpos($this->unparsed_data, $what, $start);
  }
  /**
   * Called when new frame received.
   * @param  string $data Frame's data.
   * @param  string $type Frame's type ("STRING" OR "BINARY").
   * @return boolean      Success.
   */
  public function on_frame($data, $type) {
    if (is_callable($this->on_frame_user)) {
      call_user_func($this->on_frame_user, $this, $data, $type);
    }
    return true;
  }
  public function send_frame($data, $type = null, $cb = null) {
    return false;
  }
  /**
   * Get real frame type identificator
   * @param $type
   * @return integer
   */
  public function get_frame_type($type) {
    if (is_int($type)) {
      return $type;
    }
    if ($type === null) {
      $type = 'STRING';
    }
    $frametype = @constant(get_class($this) . '::' . $type);
    if ($frametype === null) {
      error_log(__METHOD__ . ' : Undefined frametype "' . $type . '"');
    }
    return $frametype;
  }
}


По сути тут добавлены 3 вещи: «соединение с клиентом на уровне веб сокета», «получение сообщения от клиента», «отправка сообщения клиенту».

Для начала немного теории и терминологии. «Handshake» — это с точки зрения веб сокетов процедура установления соединения поверх http. Надо ведь решить кучу вопросов: как пробиться сквозь гущу прокси и кэшэй, как защитится от злых хакеров. И термин «frame» — это кусок данных в расшифрованном виде, это сообщение от клиента или сообщение для клиента. Возможно, об этом стоило написать в начале статьи, но из-за этих вот «frame» делать сокет сервер в блокирующем режиме сокетов имхо бессмысленно. То, как сделан этот момент вот тут — это лишило меня сна не на одну ночь. В той статье не рассматривается вариант, что frame приехал не полностью или их приехало сразу два. И то и то, между прочим, вполне себе типичная ситуация, как показали логи игры.

Теперь к деталям.

Соединение с клиентом на уровне веб сокета — предполагается, что протокол (например, ws_v0) перекроет метод «on_read» и внутри себя дёрнет «handshake», когда данных будет достаточно. Далее кусок «handshake» в родителе. Далее дёргается метод «send_handshake_reply», который должен быть реализован в протоколе. Этот вот «send_handshake_reply» должен такое ответить клиенту, чтобы тот понял, что «соединение установлено», нормальным браузерам — нормальный ответ, а для Сафари — особый ответ.

Получение сообщения от клиента. Обращаю внимание, что глупые клиенты могут реализовать такой вариант, что соединение не установлено, а сообщение от пользователя уже пришло. Поэтому надо бережно относится к «unparsed_data» переменной. В каждом протоколе метод «on_read» должен понять размер передаваемого frame, убедиться, что frame целиком приехал, расшифровать приехавший frame в сообщение пользователя. В каждом протоколе это делается очень по-разному и очень кучеряво (мы ж не знаем, приехал frame полностью или нет, плюс нельзя откусить ни байта следующего frame). Далее внутри «on_read», когда данные клиента получены и расшифрованы и определён их тип (да-да и такое предусмотрено), дёргаем метод «on_frame», который внутри класса «ws», тот, в свою очередь, дёрнет custom callback (функция $my_callback, перед основным циклом которая). И в итоге $my_callback получает сообщение от клиента.

Отправка сообщения клиенту. Просто дёргается метод «send_frame», который должен быть реализован внутри протокола. Тут просто шифруем сообщение и отправляем пользователю. Разные протоколы шифруют по-разному.

Теперь прилагаю 3 протокола «v13», «v0», «ve»:

Файл <ws_v13.php>
class ws_v13 extends ws {
  const CONTINUATION = 0;
  const STRING       = 0x1;
  const BINARY       = 0x2;
  const CONNCLOSE    = 0x8;
  const PING         = 0x9;
  const PONG         = 0xA;
  protected static $opcodes = [
    0   => 'CONTINUATION',
    0x1 => 'STRING',
    0x2 => 'BINARY',
    0x8 => 'CONNCLOSE',
    0x9 => 'PING',
    0xA => 'PONG',
  ];
  protected $outgoingCompression = 0;

  protected $framebuf = '';

  /**
   * Apply mask
   * @param $data
   * @param string|false $mask
   * @return mixed
   */
  public function mask($data, $mask) {
    for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) {
      $data[$i] = $data[$i] ^ $mask[$i % $ml];
    }
    return $data;
  }

  /**
   * Sends a frame.
   * @param  string   $data  Frame's data.
   * @param  string   $type  Frame's type. ("STRING" OR "BINARY")
   * @param  callable $cb    Optional. Callback called when the frame is received by client.
   * @callback $cb ( )
   * @return boolean         Success.
   */
  public function send_frame($data, $type = null, $cb = null) {
    if (!$this->handshaked) {
      return false;
    }

    if ($this->closed && $type !== 'CONNCLOSE') {
      return false;
    }

    /*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) {
        //$data = gzcompress($data, $this->outgoingCompression);
        //$rsv1 = 1;
    }*/

    $fin = 1;
    $rsv1 = 0;
    $rsv2 = 0;
    $rsv3 = 0;
    $this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT))));
    $dataLength  = strlen($data);
    $isMasked    = false;
    $isMaskedInt = $isMasked ? 128 : 0;
    if ($dataLength <= 125) {
      $this->write(chr($dataLength + $isMaskedInt));
    } elseif ($dataLength <= 65535) {
      $this->write(chr(126 + $isMaskedInt) . // 126 + 128
        chr($dataLength >> 8) .
        chr($dataLength & 0xFF));
    } else {
      $this->write(chr(127 + $isMaskedInt) . // 127 + 128
        chr($dataLength >> 56) .
        chr($dataLength >> 48) .
        chr($dataLength >> 40) .
        chr($dataLength >> 32) .
        chr($dataLength >> 24) .
        chr($dataLength >> 16) .
        chr($dataLength >> 8) .
        chr($dataLength & 0xFF));
    }
    if ($isMasked) {
      $mask    = chr(mt_rand(0, 0xFF)) .
        chr(mt_rand(0, 0xFF)) .
        chr(mt_rand(0, 0xFF)) .
        chr(mt_rand(0, 0xFF));
      $this->write($mask . $this->mask($data, $mask));
    } else {
      $this->write($data);
    }
    if ($cb !== null) {
      $cb();
    }
    return true;
  }

  /**
   * Sends a handshake message reply
   * @param string Received data (no use in this class)
   * @return boolean OK?
   */
  public function send_handshake_reply($extraHeaders = '') {
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) {
      return false;
    }
    if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') {
      return false;
    }

    if (isset($this->server['HTTP_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN'];
    }
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
    }
    $this->write("HTTP/1.1 101 Switching Protocols\r\n"
      . "Upgrade: WebSocket\r\n"
      . "Connection: Upgrade\r\n"
      . "Date: " . date('r') . "\r\n"
      . "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "\r\n"
      . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
      . "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "\r\n"
    );
    if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
      $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
    }

    $this->write($extraHeaders."\r\n");

    return true;
  }
  /**
   * Called when new data received
   * @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16
   * @return void
   */
  public function on_read() {
    if ($this->closed) return;
    if ($this->state === self::STATE_PREHANDSHAKE) {
      if (!$this->handshake()) {
        return;
      }
    }
    if ($this->state === self::STATE_HANDSHAKED) {
      while (($buflen = strlen($this->unparsed_data)) >= 2) {
        $first = ord($this->look(1)); // first byte integer (fin, opcode)
        $firstBits = decbin($first);
        $opcode = (int)bindec(substr($firstBits, 4, 4));
        if ($opcode === 0x8) { // CLOSE
          $this->close();
          return;
        }
        $opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false;
        if (!$opcodeName) {
          error_log(get_class($this) . ': Undefined opcode ' . $opcode);
          $this->close();
          return;
        }
        $second = ord($this->look(1, 1)); // second byte integer (masked, payload length)
        $fin = (bool)($first >> 7);
        $isMasked = (bool)($second >> 7);
        $dataLength = $second & 0x7f;
        $p = 2;
        if ($dataLength === 0x7e) { // 2 bytes-length
          if ($buflen < $p + 2) {
            return; // not enough data yet
          }
          $dataLength = self::bytes2int($this->look(2, $p), false);
          $p += 2;
        } elseif ($dataLength === 0x7f) { // 8 bytes-length
          if ($buflen < $p + 8) {
            return; // not enough data yet
          }
          $dataLength = self::bytes2int($this->look(8, $p));
          $p += 8;
        }
        if (self::maxAllowedPacket <= $dataLength) {
          // Too big packet
          $this->close();
          return;
        }
        if ($isMasked) {
          if ($buflen < $p + 4) {
            return; // not enough data yet
          }
          $mask = $this->look(4, $p);
          $p += 4;
        }
        if ($buflen < $p + $dataLength) {
          return; // not enough data yet
        }
        $this->drain($p);
        $data = $this->read($dataLength);
        if ($isMasked) {
          $data = $this->mask($data, $mask);
        }
        //Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data))));
        /*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame
            $data = gzuncompress($data, $this->pool->maxAllowedPacket);
        }*/
        if (!$fin) {
          $this->framebuf .= $data;
        } else {
          $this->on_frame($this->framebuf . $data, $opcodeName);
          $this->framebuf = '';
        }
      }
    }
  }
}


Файл <ws_v0.php>
class ws_v0 extends ws {
  const STRING = 0x00;
  const BINARY = 0x80;

  protected $key;

  /**
   * Sends a handshake message reply
   * @param string Received data (no use in this class)
   * @return boolean OK?
   */
  public function send_handshake_reply($extraHeaders = '') {
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
      return false;
    }
    $final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key);
    $this->key = null;

    if (!$final_key) {
      return false;
    }

    if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
    }

    $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
      . "Upgrade: WebSocket\r\n"
      . "Connection: Upgrade\r\n"
      . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
      . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n");
    if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
      $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
    }
    $this->write($extraHeaders . "\r\n" . $final_key);
    return true;
  }

  /**
   * Computes final key for Sec-WebSocket.
   * @param string Key1
   * @param string Key2
   * @param string Data
   * @return string Result
   */
  protected function _computeFinalKey($key1, $key2, $data) {
    if (strlen($data) < 8) {
      error_log(get_class($this) . '::' . __METHOD__ . ' : Invalid handshake data for client ""'); // $this->addr
      return false;
    }
    return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true);
  }

  /**
   * Computes key for Sec-WebSocket.
   * @param string Key
   * @return string Result
   */
  protected function _computeKey($key) {
    $spaces = 0;
    $digits = '';

    for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
      $c = substr($key, $i, 1);

      if ($c === "\x20") {
        ++$spaces;
      } elseif (ctype_digit($c)) {
        $digits .= $c;
      }
    }

    if ($spaces > 0) {
      $result = (float)floor($digits / $spaces);
    } else {
      $result = (float)$digits;
    }

    return pack('N', $result);
  }

  /**
   * Sends a frame.
   * @param  string   $data  Frame's data.
   * @param  string   $type  Frame's type. ("STRING" OR "BINARY")
   * @param  callable $cb    Optional. Callback called when the frame is received by client.
   * @callback $cb ( )
   * @return boolean         Success.
   */
  public function send_frame($data, $type = null, $cb = null) {
    if (!$this->handshaked) {
      return false;
    }

    if ($this->closed && $type !== 'CONNCLOSE') {
      return false;
    }
    if ($type === 'CONNCLOSE') {
      if ($cb !== null) {
        $cb($this);
        return true;
      }
    }

    $type = $this->get_frame_type($type);
    // Binary
    if (($type & self::BINARY) === self::BINARY) {
      $n   = strlen($data);
      $len = '';
      $pos = 0;

      char:

      ++$pos;
      $c = $n >> 0 & 0x7F;
      $n >>= 7;

      if ($pos !== 1) {
        $c += 0x80;
      }

      if ($c !== 0x80) {
        $len = chr($c) . $len;
        goto char;
      };

      $this->write(chr(self::BINARY) . $len . $data);
    }
    // String
    else {
      $this->write(chr(self::STRING) . $data . "\xFF");
    }
    if ($cb !== null) {
      $cb();
    }
    return true;
  }

  /**
   * Called when new data received
   * @return void
   */
  public function on_read() {
    if ($this->state === self::STATE_PREHANDSHAKE) {
      if (strlen($this->unparsed_data) < 8) {
        return;
      }
      $this->key = $this->read_unlimited();
      $this->handshake();
    }
    if ($this->state === self::STATE_HANDSHAKED) {
      while (($buflen = strlen($this->unparsed_data)) >= 2) {
        $hdr = $this->look(10);
        $frametype = ord(substr($hdr, 0, 1));
        if (($frametype & 0x80) === 0x80) {
          $len = 0;
          $i = 0;
          do {
            if ($buflen < $i + 1) {
              // not enough data yet
              return;
            }
            $b = ord(substr($hdr, ++$i, 1));
            $n = $b & 0x7F;
            $len *= 0x80;
            $len += $n;
          } while ($b > 0x80);

          if (self::maxAllowedPacket <= $len) {
            // Too big packet
            $this->close();
            return;
          }

          if ($buflen < $len + $i + 1) {
            // not enough data yet
            return;
          }
          $this->drain($i + 1);
          $this->on_frame($this->read($len), 'BINARY');
        } else {
          if (($p = $this->search("\xFF")) !== false) {
            if (self::maxAllowedPacket <= $p - 1) {
              // Too big packet
              $this->close();
              return;
            }
            $this->drain(1);
            $data = $this->read($p);
            $this->drain(1);
            $this->on_frame($data, 'STRING');
          } else {
            if (self::maxAllowedPacket < $buflen - 1) {
              // Too big packet
              $this->close();
              return;
            }
            // not enough data yet
            return;
          }
        }
      }
    }
  }
}


Файл <ws_ve.php>
class ws_ve extends ws {
  const STRING = 0x00;
  const BINARY = 0x80;

  /**
   * Sends a handshake message reply
   * @param string Received data (no use in this class)
   * @return boolean OK?
   */
  public function send_handshake_reply($extraHeaders = '') {
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
    }

    $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
      . "Upgrade: WebSocket\r\n"
      . "Connection: Upgrade\r\n"
      . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
      . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
    );
    if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
      $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
    }
    $this->write($extraHeaders."\r\n");
    return true;
  }

  /**
   * Computes key for Sec-WebSocket.
   * @param string Key
   * @return string Result
   */
  protected function _computeKey($key) {
    $spaces = 0;
    $digits = '';

    for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
      $c = substr($key, $i, 1);

      if ($c === "\x20") {
        ++$spaces;
      } elseif (ctype_digit($c)) {
        $digits .= $c;
      }
    }

    if ($spaces > 0) {
      $result = (float)floor($digits / $spaces);
    } else {
      $result = (float)$digits;
    }

    return pack('N', $result);
  }

  /**
   * Sends a frame.
   * @param  string   $data  Frame's data.
   * @param  string   $type  Frame's type. ("STRING" OR "BINARY")
   * @param  callable $cb    Optional. Callback called when the frame is received by client.
   * @callback $cb ( )
   * @return boolean         Success.
   */
  public function send_frame($data, $type = null, $cb = null) {
    if (!$this->handshaked) {
      return false;
    }

    if ($this->closed && $type !== 'CONNCLOSE') {
      return false;
    }

    if ($type === 'CONNCLOSE') {
      if ($cb !== null) {
        $cb($this);
        return true;
      }
    }

    // Binary
    $type = $this->get_frame_type($type);
    if (($type & self::BINARY) === self::BINARY) {
      $n   = strlen($data);
      $len = '';
      $pos = 0;

      char:

      ++$pos;
      $c = $n >> 0 & 0x7F;
      $n >>= 7;

      if ($pos !== 1) {
        $c += 0x80;
      }

      if ($c !== 0x80) {
        $len = chr($c) . $len;
        goto char;
      };

      $this->write(chr(self::BINARY) . $len . $data);
    }
    // String
    else {
      $this->write(chr(self::STRING) . $data . "\xFF");
    }
    if ($cb !== null) {
      $cb();
    }
    return true;
  }

  /**
   * Called when new data received
   * @return void
   */
  public function on_read() {
    while (($buflen = strlen($this->unparsed_data)) >= 2) {
      $hdr       = $this->look(10);
      $frametype = ord(substr($hdr, 0, 1));
      if (($frametype & 0x80) === 0x80) {
        $len = 0;
        $i   = 0;
        do {
          if ($buflen < $i + 1) {
            return;
          }
          $b = ord(substr($hdr, ++$i, 1));
          $n = $b & 0x7F;
          $len *= 0x80;
          $len += $n;
        } while ($b > 0x80);

        if (self::maxAllowedPacket <= $len) {
          // Too big packet
          $this->close();
          return;
        }

        if ($buflen < $len + $i + 1) {
          // not enough data yet
          return;
        }

        $this->drain($i + 1);
        $this->on_frame($this->read($len), $frametype);
      } else {
        if (($p = $this->search("\xFF")) !== false) {
          if (self::maxAllowedPacket <= $p - 1) {
            // Too big packet
            $this->close();
            return;
          }
          $this->drain(1);
          $data = $this->read($p);
          $this->drain(1);
          $this->on_frame($data, 'STRING');
        } else {
          if (self::maxAllowedPacket < $buflen - 1) {
            // Too big packet
            $this->close();
            return;
          }
        }
      }
    }
  }
}


Сразу хочу отметить, что протокол VE не тестировал — понятия не имею кто его использует. Но добросовестно сконвертировал и урезал код из PhpDeamon.

Протокол V13 используют все нормальные браузеры (FireFox, Opera, Chrome, Яндекс). Даже IE его использует (извините, после IE6 — для меня IE никогда не будет «браузером», даже команда разработчик IE заявляли, что это «не браузер, а тонкий клиент»). Протокол V0 использует браузер «Сафари».

Вместо заключения


Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon». В итоге сокет сервер, приведенный тут, совместим со всеми современными браузерами. Работает без утечек памяти и годится для использования в высоконагруженных системах.

Обновление:
  • Комментарий автора статьи, на которую я постоянно ссылаюсь в тексте: habrahabr.ru/post/301822/#comment_9634636
  • Метод read_lint() содержит ошибку — что мы читаем данные тела http запроса, хотя должны были читать только заголовки.
  • В основном теле цикла — не корректное использование указателей при переключении протокола.
  • По просьбам трудящихся вот ссылка на gitbub github.com/anlide/websocket тут код исправленный и ещё ping-pong доработанный, осталось ещё причину закрытия сокета фиксировать и заменить select на что-то — и будет замечательная смесь лучших серверных решений по websocket.
Поделиться с друзьями
-->

Комментарии (66)


  1. omatosan
    26.05.2016 14:07
    +1

    А почему отказались от использования PhpDeamon?


    1. anlide
      26.05.2016 15:04

      Потому что в этой задаче необходимо выполнять очень много математики под нагрузкой. Соответственно необходимо было сделать многопоточность в рамках php (напоминаю, что это делается только созданием новых потоков). А это в свою очередь требует использовать stream_select общий для дочерних процессов, и для websocket, и для админки. И этот подход мне хорошо знаком.
      С другой стороны PhpDeamon в основе использует EventBuffer — с которым я совсем незнаком и грамотное использование этого дела под нагрузкой — это неизвестно сколько времени и нервов и потерянных пользователей.


      1. Fesor
        26.05.2016 21:06

        Соответственно необходимо было сделать многопоточность в рамках php

        pthreads не рассматривали? Ну мол что бы вынести вычисления в отдельный пул тредов и разгрузить воркер обработки запросов. Внутри потоков завести очереди, и разруливать кто что делает. Можно обойтись без каких-либо локов и т.д. и получить максимальную утилизацию CPU.


        напоминаю, что это делается только созданием новых потоков

        тут вы подразумеваете именно стримы, потоки ввода/вывода. Я раза 3 перечитал ваш комментарий что бы понять к чему тут потоки, дочерние и stream_select.


        Ну то есть в приведенном вами коде у нас один процесс, один поток, и неблокируемые потоки ввода/вывода. Никакой многопоточности. Как и дочерних процессов собственно.


        1. anlide
          26.05.2016 21:37

          pthreads к сожалению не рассматривал. Отстал от жизни, изучу это. Бегло осмотрел — похоже это то, что мне было надо, а я по сути написал и оттестировал самодельный pthreads на чистом php.

          «напоминаю, что это делается только созданием новых потоков» очень извиняюсь, опечатался.
          «напоминаю, что это делается только созданием новых процессов» я хотел сказать.

          В приведенном коде да, stream_select работает только с WebSocket. Можно наверное больше кода игрового сервера привести «аналог pthreads» — оно кому-то интересно?

          Игровой сервер используется как сервис linux. То есть "/etc/init.d/game restart". Соответственно игровой сервер корректно реагирует на события из ОСи (выключение, перезагрузка, перезапуск сервиса). Работает мастер-процесс и несколько дочерних процессов (по количеству процессоров + 1 для очень долгих и низко-приоритетных задач). Приведенный в статье код — это кусок мастера.


          1. Fesor
            26.05.2016 22:10

            а я по сути написал и оттестировал самодельный pthreads на чистом php.

            Так, давайте все же проясним. Вот способы работы с вводом/выводом в порядке эффективности (последний — самый эффективный):


            • процессы
            • потоки
            • неблокируемые вызовы + stream_select или если проще — event loop

            По сути вы последнее и реализовали, правда есть более эффективные реализации (reactphp + libev/libuv) но это уже мелочи.


            То есть для web-sockets как раз таки ваш подход очень хорошо подходит. Просто что бы вычисления не стопорили обработку запросов — можно было это дело мувнуть в отдельный трэд. Тогда и накладных расходов на взаимодейтсвие было бы меньше и работало оно независимо.


            1. anlide
              26.05.2016 23:57
              -1

              Да «event loop» наверное можно назвать. Разумеется это всё надо, чтобы обработка запросов не стопорила общение клиентов с сервером. Темболее в данном проекте некоторые рядовые запросы могут обрабатываться несколько секунд.

              reactphp websocket — он же http://socketo.me/. Уже советовали посмотреть, посмотрю чем он лучше. Кроме того, что они поймали и пофиксили кучу багов — есть инфа чем он лучше, чем самодельный event loop?


              1. Fesor
                27.05.2016 01:01

                есть инфа чем он лучше, чем самодельный event loop?

                1) проверенный в бою
                2) не нужно поддерживать самому
                3) есть готовый адаптер для libev/libuv. К сожалению тот же libuv имел бы для вас смысл если бы помимо парсинга http он предоставлял так же обработку websockets. Но для ребят, которые пишут на неумираемом PHP он неплохо облегчает жизнь.


                1. anlide
                  27.05.2016 01:26

                  Читаю вот код из «socketo.me» — там ребята тоже очень не любят Сафари :)

                  Вот камменты в протоколе для Сафари:

                  FOR THE LOVE OF BEER, PLEASE PLEASE PLEASE DON'T allow the use of this in your application!

                  The Hixie76 is currently implemented by Safari

                  Ща внимательно всё просмотрю, может тут реализован последний протокол так, что нет ошибки о которой я писал «firefox@linux ping frame» у PhpDeamon.

                  Кроме того меня очень радует, что названия протоколов нормальные:
                  • Hixie76: http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
                  • RFC6455: http://tools.ietf.org/html/rfc6455
                  • HyBi10 extends RFC6455


        1. anlide
          02.06.2016 03:27

          pthreads не рассматривали?

          Вот рассмотрел повнимательнее. Попробовал. Воодушевился, собрался переписывать всё с использованием потоков. Но…
          В документации и примерах нет информации как ворочать большие объёмы данных между потоками. Или я не нашёл?
          $var = true;
          class Test extends Thread {
            public function run() {
              global $var;
              $var = false;
            }
          }
          $test = new Test();
          $test->start();
          $test->join();
          var_dump($var);
          

          Приведенный код не работает — то есть выводит «true», хотя я ожидаю «false».

          Мне очень хотелось бы, чтобы переменная $var не дублировалась, не синхронизировалась, а именно была бы в родительском потоке. Ну вобщем любым способом избежать дублирования данных в физической памяти. Это возможно в pthreads или нет?


          1. Borro
            02.06.2016 09:26

            Посмотрите на этот пример. Так же есть хорошая статья на тему.


            1. anlide
              02.06.2016 15:45

              <?php
              class Connect extends Worker {
                public function getStorage() {
                  if (!self::$storage) {
                    self::$storage = array();
                    for ($i = 0; $i < 1000000; $i++) self::$storage[$i] = rand(0, 256 * 256 - 1);
                  }
                  var_dump(memory_get_usage());
                  return self::$storage;
                }
                protected static $storage;
              }
              class Query extends Threaded {
                public function run() {
                  $this->data = $this->worker->getStorage();
                  sleep(5);
                  var_dump(memory_get_usage());
                  var_dump(count($this->data));
                }
                protected $data;
              }
              $pool = new Pool(4, "Connect", []);
              $pool->submit
              (new Query());
              $pool->submit
              (new Query());
              $pool->submit
              (new Query());
              $pool->submit
              (new Query());
              $pool->submit
              (new Query());
              $pool->submit
              (new Query());
              $pool->shutdown();
              

              Я взял этот пример и чуть подправил его. И попробовал в разном месте по разному расставить memory_get_usage.
              И выяснилось, что если мы записываем ответ getStorage в локальную переменную, например $data — то расхода памяти нет. Как будто ссылка передалась.
              Если записать в $this->data — то данные полностью копируются.
              Попытка использовать "&" приводит к крашу CLI.
              Возвращаясь к $data я попробовал перезаписать данные пустым массивом — не получается. То есть локальная переменная становится пустым массивом, а в Connect::$storage продолжает лежать большой кусок данных.

              Вывод какой — если используется Pool, то конкретные задачи могут получить данные только для чтения из Worker без потери памяти. Запись в Worker невозможно.

              Вывод правильный?


              1. Borro
                02.06.2016 21:54

                Немного не понял.
                Запоминаем в треды, потом используем в основном потоке:

                <?php
                class TestWork extends \Threaded implements \Collectable
                {
                    public $result;
                    public function run()
                    {
                        sleep(1);
                        $this->result = random_int(0, PHP_INT_MAX);
                    }
                }
                
                echo 'Start', PHP_EOL;
                $pool = new \Pool(4, \Worker::class);
                $time = microtime(true);
                $works = [];
                for ($i = 0; $i < 10; $i++) {
                    $work = new TestWork();
                    $pool->submit($work);
                    $works[] = $work;
                }
                $pool->shutdown();
                $time = microtime(true) - $time;
                
                foreach ($works as $work) {
                    echo 'Work result: ', $work->result, PHP_EOL;
                }
                echo 'End at ' . $time, PHP_EOL;
                

                Выдает:
                Start
                Work result: 7905944565329289890
                Work result: 5108499292478502335
                Work result: 5714338237394927753
                Work result: 7690405846878811114
                Work result: 8972815893492806475
                Work result: 7717729732644902772
                Work result: 3482825673580680883
                Work result: 5913335648957882133
                Work result: 7176745494439780246
                Work result: 3082331963992595859
                End at 3.005893945694


                Попробовал изменить ваш код. В каждый воркер можно писать:
                <?php
                class Connect extends Worker
                {
                    public function getStorage()
                    {
                        if (!self::$storage) {
                            self::$storage = [];
                        }
                        self::$storage[] = random_int(0, PHP_INT_MAX);
                        return self::$storage;
                    }
                
                    protected static $storage;
                }
                
                class Query extends Threaded
                {
                    public function run()
                    {
                        $this->data = $this->worker->getStorage();
                        var_dump(count($this->data));
                        sleep(5);
                    }
                
                    protected $data;
                }
                
                $pool = new Pool(4, "Connect", []);
                $pool->submit(new Query());
                $pool->submit(new Query());
                $pool->submit(new Query());
                $pool->submit(new Query());
                $pool->submit(new Query());
                $pool->submit(new Query());
                $pool->shutdown();
                

                Вывод:
                int(1)
                int(1)
                int(1)
                int(1)
                int(2)
                int(2)


                1. anlide
                  03.06.2016 03:12

                  В первом примере данные хранятся в самих потоках, там же запись в них. Далее чтение этих данных из основного потока, я так понимаю без физического копирования данных.
                  Во втором примере данные хранятся в экземплярах класса Connect и конкретная задача их успешно читает, но это чтение данных внутри одного потока. А запись осуществляется дёрганьем метода, а не напрямую.

                  Но я собственно на такое чтение данных и не жаловался. Как мне в дочернем потоке почитать данные из родительского потока не дублируя при этом данные в физической памяти? Как мне из дочернего потока изменить данные в родительском потоке? Без этого для моей задачи использовать pthreads бессмысленно. И если погуглить «c++ многопоточность память» то видно, что в C++ таких проблем нет.

                  К чему этот разговор вообще веду. Вот у меня есть реализованый механизм распределения задач в несколько процессов, которые общаются через сокеты. Общаются посылая сигналы друг-другу. Это собственно первая проблема — что общаться они должны только сигналами. Это очень неудобно и громоздко. Большие куски данных хранятся там где собственно используются (например данные по конкретному игроку, а они довольно объёмные), если другому процессу нужны данные из этого процесса — то он их тянет из БД или процесса, поработает с ними и удаляет из своей памяти. Это вторая проблема — хотелось жеж бы просто по id пользователя обратится куда-то в памяти, получить нужные данные и всё, не дублируя объёмные данные, не реализуя сигналы. Кроме того, есть большой массив констант типа тексты заданий, куча настроек игрового процесса и т.д. — это тоже желательно не дублировать, один экземпляр на процесс.

                  Так вот я надеялся, что pthread решат эти две проблемы. Но похоже он не решает ни того ни другого и добавляют ещё одну. Я надеялся, что у меня будет одна большая переменная с текстами-настройками игры, будет один большой массив с игроками. И потоки будут просто получать задачи и указатели на память и работать над ними. Не дублируя данные. Код был бы маленький и очень простой. Память использовалась бы максимально эффективно. Процессоры при необходимости загружались бы на максимум. Новые фичи писать в такой системе было бы легко.

                  Но что же это получается с pthreads — поток должен к себе полностью скопировать всё, обработать, изменить и отправить обратно? Опять сигналы вместо прямого обращения к данным? Далее по конкретному игроку — пока он онлайн, для него формируется большой кэш данных, который ещё и меняется постоянно, его тоже надо весь скопировать в поток, а потом записать обратно? И бонусная проблема — тексты-настройки игры тоже надо копировать целиком в поток?

                  Я надеюсь что ошибаюсь. Или нет?


                  1. Borro
                    03.06.2016 09:09

                    Понял. Тогда может быть пример с каналом вам подойдет? Вообще в примерах много интересного можно найти.


          1. Fesor
            02.06.2016 10:49

            Разработчики pthreads специально сделали это ограничение, каждый поток со своим стораджем для переменных. Раньше можно было "расшаривать" данные делая их глобальными, но эту возможность убрали. Теперь можно только пересылать данные между потоками. И это правильно и сильно уменьшает риск выстрела в ногу.


            1. SerafimArts
              02.06.2016 12:22

              Кстати, раз уж заговорили о pthreads — не сталкивался никогда с тем, что оно не позволяет передавать между потоками сложные структуры данных, вроде объектов (включая инстансы \Closure)?


              У меня такое ощущение сложилось, что оно при передаче данных между потоками — сериализует их, а потом распаковывает, отсюда и косяк с замыканиями.


              1. Fesor
                02.06.2016 20:31

                не сталкивался никогда с тем, что оно не позволяет передавать между потоками сложные структуры данных

                Можно передавать только то что можно сериализовать. Сталкивался с этим пару лет назад когда пытался вынести доктрину в отдельный пул потоков для организации асинхронной работы… и проиграл поскольку сущности с проксями передать не выходило (там entity manager у каждой прокси есть).


                что оно при передаче данных между потоками — сериализует их

                именно так внутри и происходит и об этом есть в документации)


    1. Fesor
      26.05.2016 20:54
      +1

      PhpDeamon на сегодняшний день морально устаревшее решение.


  1. hockfan
    26.05.2016 14:07
    +1

    Я, видимо, еще из тех стариков извращенцев у кого очень сложный протокол WebSocket'а реализован на C/C++. Спасибо работодателю за терпение!


    1. DeLuxis
      26.05.2016 14:19

      А можно исходники глянуть?


    1. GamePad64
      26.05.2016 16:56

      Вот, кстати, хорошая C++-библиотечка для реализации WebSockets: WebSocket++ Основана на boost.asio, умеет многое, и очень расширяемая.
      Ещё в Qt есть QWebSocket. Для простеньких приложений самое то, но не полностью поддерживает RFC6455 (например, не умеет Subprotocol).


    1. MacIn
      26.05.2016 19:27
      -2

      Ха, только недавно делал бэкпорт WS под Delphi 7.


    1. Videoman
      26.05.2016 23:44

      Да, нет. Мы вот тоже реализовали свой велосипед на С++. Правда, даже не поняли что он сложный. Гоняем по нему видео для MSE. Отлично работает на всех клиентах, кроме IOS-based.


      1. Fesor
        26.05.2016 23:55

        Отлично работает на всех клиентах, кроме IOS-based.

        что ж вы так отрубаете платежеспособных кастомеров?)


        Справедливости ради речь у вас скорее идет не о web-сокетах а о старых добрых tcp сокетах.


        1. Videoman
          27.05.2016 01:10

          Ну, к счастью, клиентов на IOS у нас пока нет. Мы используем Web клиент как дополнительный тонкий клиент к основному, без процедуры установки. Все работает только во внутри-корпоративных сетях, где очень мало мобильных клиентов. Не совсем понял про «старые» советы. Веб-сокеты, действительно, реализованы поверх обычных tcp, но сверху там еще есть: handshake, отдельные сообщения с заголовками и т.д. Что делать с IOS пока не понятно, так как MSE там выпилено, даже в Хроме.


          1. anlide
            27.05.2016 01:18

            В этой ветке вы об этом прямо не говорите, но похоже вы поймали ту же проблему из-за которой была написана эта статья.

            Я постоянно ссылаюсь на https://habrahabr.ru/post/209864/ что он хорош и прост, но safary с ним не работает. То есть IOS не работает. А чтобы работало под IOS — надо посложнее код написать. Собственно об этом «посложнее» и идёт вся эта статья.


            1. Videoman
              27.05.2016 11:27

              Не, я немного не об этом. Веб-сокеты, как раз, работают на всех, более или менее, современных платформах. Не работает MSE (Media Source Extensions). В моем случае, к счастью, нет необходимости поддерживать старые браузеры (до принятия WebSockets, как стандарта).


              1. anlide
                27.05.2016 11:53

                Я говорю о последней версии браузера Сафари.


  1. bat
    26.05.2016 14:08
    +4

    Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon».

    Ну так оформите нормально, опубликуйте код и документацию, поддерживайте, тогда вам скажут спасибо.
    А так — мыши и кактус


    1. anlide
      26.05.2016 15:25
      -1

      Я вроде подчеркнул не раз в тексте, что код максимально упрощён с ООП до процедурного стиля. Соответственно в тупую использовать этот код целиком нельзя. Но в процедурном стиле — проще всего изложить что происходит в коде и читателю проще выдернуть нужные куски и вставить себе в код.
      Для стиля «давайте сделаем побыстрее» — вся статья неприемлема, то есть надо использовать PhpDeamon, socketo.me или ещё какое-то готовое решение. А для стиля «давайте сделаем хорошо» — статья то, что надо. То есть читатель пишет свой сервис, у него свой основной цикл stream_select и ему в таком виде намного проще надёргать куски кода себе, чем дёргать куски кода из готового решения.
      Если речь идёт про «а шо такое websocket» — то это статья не про это.

      Хотя я собираюсь опубликовать на github и запостить тут ссылку туда. И есть небольшая ошибка в методе «read_line» и небольшая опечатка в указателях основного цикла — разумеется надо как-то опубликовать исправленный код.


      1. bat
        27.05.2016 06:23
        -1

        Про оформление и поддержку был скорее сарказм…

        Зачем придумывать и героически преодолевать трудности? Да на php можно написать http-сервер, websocket-сервер. Но зачем? Когда есть готовые решения? пусть и на других платформах. Например, nodejs — event loop и http из коробки, реализации ws на выбор, и все это отлично работает + язык знакомый для вебразработчика. Или, например, go — тоже все есть, да, может язык и незнакомый, но эффективнее по по процессору и памяти.


  1. akeinhell
    26.05.2016 14:08

    Не рассматривали уже готовые решения?
    К примеру очень хороший вариант socketo.me/docs


    1. anlide
      26.05.2016 15:14

      Я рассматривал только PhpDeamon на этапе проектирования.
      https://habrahabr.ru/post/301822/#comment_9627298 тут написал почему отказался от его использования.
      socketo.me обязательно просмотрю — может ещё чего-то важное найду.

      Пока предварительная информация — но похоже в коде PhpDeamon / WebSocket / V13 есть ошибка, код некорректно реагирует на ping frame. Из-за чего FF@Linux разрывает websocket соединение. Подчёркиваю, что это требует перепроверки.
      Обращаю внимание, что поиск и исправление подобной ошибки в сторонней библиотеке существенно сложнее чем в своей.


  1. SerafimArts
    26.05.2016 19:35
    +7

    Я чего-то не понимаю. Но в чём причина писать такое, вместо того, что бы написать одну строчку composer require cboden/ratchet? Это всё, что надо сделать, чтобы включить полную (т.е. включая устаревшие протоколы) поддержку сокетов к себе в проект.


    P.S. По-моему с приходом Ratchet — PhpDaemon ну не то чтобы умер, но в предсмертном состоянии. Код первого на порядок качественнее, имхо.


    1. Fesor
      26.05.2016 20:59
      -1

      Поправка, с появлением решений вроде reactphp phpdaemon в принципе стал ненужным. Да и не поддерживается он уже довольно давно.


  1. evnuh
    26.05.2016 21:08
    -3

    Почему PHP? Если вам нужно неумирающее приложение с сохранением состояния (сервер игры), многопоточная математика и вебсокеты — то PHP — это худшее решение для вашей задачи.


    1. SerafimArts
      26.05.2016 21:09
      +4

      Не хочу показаться излишне грубым, но вы немного отстали от жизни =)


      UPD: А вообще хочется аргументации почему вы так считаете? PHP7 кушает раза в 4-5 меньше оперативки, нежели nodejs, многопоточность и асинхроные плюшки есть, GC (по крайней мере в той же 7ке) наверное один из лучших, что есть на рынке. И т.д. А с приходом JIT, который сейчас в девел веточке...


      1. evnuh
        26.05.2016 21:50
        +1

        Ну, многопоточность там в зачаточном состоянии. Я так понимаю, почти всё в PHP не thread-safe, то есть локи и сериализация доступа? Как с дсотупом к коллекциям? Лочить всю коллекцию? Могу ошибаться, поправьте. Какая модель памяти? О lock-free я так понимаю тоже можно только мечтать? А какой GC в PHP7? RC? Stop-the-world?
        Я сторонник производительности, и для меня писать реалтаймовый сервер игры на скриптовом языке — уже сомнительная затея.


        1. SerafimArts
          26.05.2016 23:11
          +1

          О, вот тут вы меня в тупик поставили. Я просто не представляю как устроены pthreads, т.к. работал с ними исключительно сквозь вышеупомянутый react.


          На счёт GC — схема простая, у пыха нет явного подсчёта (т.е. вызов gc_collect_cycles только руками) ссылок, как вследствие — фризов освобождения оной — тоже, просто при создании нового объекта под него выделяется уже неиспользуемая память. В результате получаем да, небольшие задержки при создании объектов и постоянно растущую память, когда объекты не высвобождаются, с другой стороны — вполне эффективный рантайм без явных проседаний от очистки памяти. Лично мне не удавалось вообще заспамить оперативку в рантайме, при явном отсутсвии "заботы об удалении ненужного", скажем так. Думаю, что более подробную информацию по этому вопросу может предоставить Дмитрий Стогов, я просто не компетентен во внутренностях, но на практике — оно быстро работает, без фризов и "левых" утечек, на которые я напарывался при работе с джавой.


          Ну когда действительно важна производительность — да, плюсы вне конкуренции. Это вообще другой уровень и другие требования. Но сабжевая игра написана для ВК, так что и требования соответсвующие.


          1. Fesor
            26.05.2016 23:28

            да, плюсы вне конкуренции

            А как же Си или Rust? По производительности они выигрывают. Последний в принципе неплох и в плане скорости разработки.


            1. SerafimArts
              26.05.2016 23:36

              В моём случае название языка было использовано в качестве нарицательного. Имел ввиду любой язык, который предоставляет прямой доступ к памяти и стеку, и не делает львиную долю работы по её управлению за того, кто использует какой-либо язык. Короче любой +- низкоуровневый. Да даже Go можно, тоже вполне себе быстрая штука.


  1. dmalchenko
    27.05.2016 11:44

    У меня вопрос, может кто знает какие крупные проекты используют WebSocket и long-polling? Знаю, что ВК использует и то и другое. WS хорош, но сложен в реализации. Хотелось бы узнать как long-polling работает с высокими нагрузками? (при хорошо настроенном сервере)


    1. SerafimArts
      27.05.2016 12:16

      ВК использует сокеты? Где? о_0 Для сообщенек и новостей long-polling only


      1. dmalchenko
        27.05.2016 12:22

        спасибо, long-polling видел в ВК в живую, про WS нашел на просторах гугла. Возможно знаете про другие крупные проекты? Одноклассники тоже сидят на long-polling.


        1. SerafimArts
          27.05.2016 13:39

          Из того что знаю: Slack, Gitter, GoodGame (чатик), Twitch (чатик) на ws. Возможно и чатик Youtube тоже (это вполне было бы логично), но не уверен.


          1. dmalchenko
            27.05.2016 14:07

            Спасибо! Может посоветуете что использовать при больших нагрузках (например одновременно 5-10к). Как относитесь к Server-Sent Events?


            1. SerafimArts
              27.05.2016 16:16

              Опыта с реально высокой нагрузкой + реалтаймом не было. Так что тут я не советчик.


    1. Fesor
      29.05.2016 00:06

      Насколько я помню вконтактики да фэйсбуки используют long polling просто потому что многие сидят через прокси (с работы например) которые режут весь не http траффик. Так же long polling можно считать stateless и это намного удобнее в плане распределения нагрузок.


      1. SerafimArts
        29.05.2016 04:52

        <irony>После беглого просмотра фконтактовских исходников kphp (https://github.com/vk-com/kphp-kdb) — я предполагаю, что у них, в частности, немного иная причина использования лонгполлинга.</irony>


  1. morozovsk
    30.05.2016 15:50

    anlide, в этой статье вы неоднократно даёте ссылку на мою статью Делаем вебсокеты на PHP с нуля, но это только первая статья из трёх на эту тему.
    Вторая часть статьи: IPC. Межпроцессное взаимодействие
    Третья часть статьи: От чата до игры: Battle City
    Все они были написаны в начале 2014 года, а по их результатам создана библиотека и опубликована на гитхабе, в которой уже было учтено всё, что вы описываете ещё в начале 2014 года.
    На всякий случай я попросил друга проверить работу моей библиотеки в Safari и она работает. В той же википедии пишут, что вебсокеты версии 13 (теперь эта версия закреплена в RFC 6455) поддерживаются в Apple Safari начиная с версии 6, которая вышла 2012 году, за два года до написания моих статей.

    Ссылка на библиотеку также есть в каждой статье. Библиотека поддерживает только протокол вебсокетов и не поддерживает long polling, т.к. подавляющее большинство браузеров поддерживает вебсокеты, о чём я писал в третьей статье и приводил цифры (а это было напомню ещё в начале 2014 года, а сейчас 2016 год на дворе).

    Также на странице библиотеки приведены ссылки на демки игр, которые физически не могли бы работать, если бы обладали описанными вами изъянами.

    В вашей реализации вы используете socket_select, что не даёт возможности обслуживать более тысячи одновременных соединений без перекомпиляции php. В моей библиотеке также поддерживаются расширения pecl/event и pecl/libevent, что устраняет эти ограничения.
    И да, моя библиотека протестирована на php седьмой версии и используется в продакшене уже несколько лет и не только мной, а демки висели без перезапуска больше года (до момента смены мною хостинга) и утечек памяти за этот период зафиксировано не было.

    Сложилось впечатление, что статья была написана 2 года назад, а опубликована сейчас. Прошу добавить мой комментарий в конец статьи в качестве опровержения, чтобы не вводить читателей в заблуждение.


    1. anlide
      30.05.2016 17:43
      +1

      Спасибо за разъяснения, мне много стало понятнее.

      22 янв 2016 я подал очередную заявку на публикацию игры в vk. По логам вижу: зашёл админ и сразу закрыл игру. Вот его useragent «Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36». У меня windows. Скачал последнюю версию safari, что нашёл в инете на тот момент: 5.1.7 версии.

      Как теперь видно — админ vk не поймал ошибку соединения, но ему что-то другое не правилось, а я поймал такую ошибку. В итоге сделал неправильный вывод (что websocket под safari не сработал) и неправильные дальнейшие мои действия.

      Должен признать, что это мой огромный провал.

      В статье вы используете stream_select, а в коде на github используете EventBuffer. Но впрочем я уже созрел написать на тему других проблем, с которыми я столкнулся (правильное использование cocos js и вынесение задач в отдельный процесс). Но… а что за ограничения socket_select/stream_select на количество соединений?


      1. morozovsk
        30.05.2016 19:06

        Если в socket_select передавать дескрипторы с номером больше 1024 (т.е. открыто более 1024 соединений), то выдаётся такая ошибка:
        PHP Warning: socket_select(): You MUST recompile PHP with a larger value of FD_SETSIZE. It is set to 1024, but you have descriptors numbered at least as high as 1024.

        И судя по всему указанный вами user-agent принадлежит не сафари как могло показаться на первый взгляд, а хрому, что вполне логично — тестировать работоспособность приложения в самом используемом браузере.
        http://www.useragentstring.com/Chrome35.0.1916.47_id_19788.php
        https://github.com/gorhill/uMatrix/wiki/Latest-user-agent-strings
        http://www.useragentstring.com/pages/Chrome/
        http://www.webapps-online.com/online-tools/user-agent-strings/dv/operatingsystem51847/os-x


        1. anlide
          02.06.2016 03:00
          +1

          Большое спасибо за замечание. Долго думал, что с этим делать. На собственном хостинге увеличивать количество файловых дейскрипторов для любого приложения можно без проблем. Поэтому видимо надо просто следить за этой проблемой и должный report об этой ошибке сделать. И под нагрузкой socket_select использовать можно будет с таким подходом. Да?

          Игра была запущена по схеме «тихий старт». То есть нигде не постится новость о новой игре, а в недрах раздела «все игры» — собственно и появляется игра. На данный момент посмотрело игру ~1750 человек. Google analytic пишет, что safary с версией 5 было аж 1 пользователь за всё время (ниже пятой версии пользователей не было). Очевидно это был мой браузер. И проблемы со старым протоколом на самом деле неактуальны.


          1. morozovsk
            02.06.2016 14:39

            Эта ошибка пользователя, а не баг в php (не нужно никуда писать репорты), именно поэтому генерируется PHP Warning, а не падает сам php. Эта ошибка сообщает пользователю, что socket_select нельзя использовать с дескрипторами с номером больше 1024. Это сделано потому что php-функция socket_select делает системный вызов select, а он работает используя (насколько помню) системный вызов poll, который не очень эффективный, особенно на большом количестве соединений. В системе есть более эффективные методы обработки — epoll, kqueue, kevent и т.д.
            Конечно можно перекомпилировать php с увеличенной константой FD_SETSIZE, но тогда будет неэффективно использоваться процессор. Гораздо лучше использовать расширение pecl/event (которое уже обновили для его работы с php7).
            pecl/event — это обёртка над системной библиотекой libevent (которую использует memcached, chrome, tor и т.д.)

            В своей библиотеке я добавил поддержку pecl/event и возможность переключаться на неё с socket_select и обратно. Это помогает новичкам запустить свой вебсокет-сервер без установки дополнительных модулей, а когда нагрузка увеличится, то донастроить сервер и переключиться на более эффективный механизм.


            1. anlide
              03.06.2016 03:37

              В вашем коде я вижу 3 реализации WebSocket — socket_select, event, libevent. Чем отличаются event и libevent? Какой лучше выбрать? Почему?
              На первый взгляд libevent больше нравится — потому что PhpStorm о нём знает, а про event — нет. :)


              1. morozovsk
                03.06.2016 08:35

                выбор очевиден в пользу event, потому что libevent в стадии beta, не обновлялся 3 года и не работает с php 7.


  1. akubintsev
    02.06.2016 17:24

    Я так и не понял в чем проблема с Сафари.
    Делал чатик с помощью Ratchet, замечательно работает под этим браузером.


    1. anlide
      02.06.2016 17:50

      Уже разобрались. Пол года назад, последняя версия safary под windows была пятёрка. Под ней нужен старый протокол. Кстати Ratchet его поддерживает.
      Но кстати я изучил код Ratchet и он не поддерживает не шифрованные (masked) данные. А в протоколе указано как поддерживать это (и PhpDeamon и код приведенный тут умеет это). Может ещё чего всплывёт по ходу ковыряния его исходника.
      Ну вобщем я статью обновлю — как все вылезшие подробности прояснятся.


      1. akubintsev
        02.06.2016 18:45

        На мой взгляд это не такая серьезная проблема, хотя реализация не помешала бы. Достаточно использовать WSS и тут, правда, без nginx никак, а также настроить фильтрацию origin domain.


  1. Mannaward91
    02.06.2016 20:01

    Спасибо за статью, очень жду ссылку на github.


  1. Mannaward91
    02.06.2016 20:38

    Метод read_lint() содержит ошибку — что мы читаем данные тела http запроса, хотя должны были читать только заголовки.
    В основном теле цикла — не корректное использование указателей при переключении протокола.

    Если кто-нить исправил эти ошибочки, скиньте кусочки кода пожалуйста.


  1. anlide
    04.06.2016 04:22

    Запостил на github https://github.com/anlide/websocket
    Ещё надо будет с двумя вопросами разобраться — это socket_select под нагрузкой и причина закрытия сокета. Ещё расставить кучу комментариев по коду и вопрос можно будет закрывать.


    1. Fesor
      04.06.2016 10:29
      +1

      socket_select под нагрузкой

      А в чем именно вопрос? socket_select под нагрузкой нормально справляется. Не так как libev например (его можно заюзать) но норм. Заменять бы я рекомендовал просто на готовую библиотеку для реализации event loop. Более оптимального решения с точки зрения нагрузок для задач в духе web-сокетов как бы и нет.


      Ещё расставить кучу комментариев по коду и вопрос можно будет закрывать.

      Лучше пользуйтесь осмысленными названиями переменных и методов, а так же выносите части в приватные методы, используйте константы и все такое и комментарии будут просто не нужны.


      1. anlide
        04.06.2016 14:44

        https://habrahabr.ru/post/301822/#comment_9635078 вот жи человек говорит, что 1000 соединений и тю-тю, надо пересобирать php. И при этом он ещё и медленный. Так где же правда?

        На тему комментариев — я с вами согласен. Наверное стоит поудалять все комментарии, написать свои только в сложных местах, в большей части кода сделать говорящие названия переменных.


        1. Fesor
          04.06.2016 19:30

          Ну если у вас более 1000 соединений, тут проще использовать libev или libevent. Экстеншены ставить из пекла не особо то сложно, в обращении они не намного сложнее stream_select, так что...


          А по поводу пересборки — это не такая большая проблема на самом деле) Особенно если использовать Docker — один раз собрал и хорошо.


          1. morozovsk
            05.06.2016 01:01

            libevent — не вариант, об этом я уже выше писал https://habrahabr.ru/post/301822/#comment_9641462
            по поводу пересборки — это всё таки проблема. Совсем недавно вышла php 7.0.7, а до неё 7.0.6 и 7.0.5 и т.д. и все они — «security release», так что перекомпилировать каждый раз php вместо того чтобы обновить из пакета — это лишние действия, а если не обновлять то будет решето.
            За libev спасибо, не знал, хотя его автор сделал также и event, который я использовал ранее.