|
|||||||
Websocket в продакшене
Время создания: 07.05.2018 22:30
Автор: https://habr.com/users/anlide/
Текстовые метки: websocket
Раздел: WebSocket
Запись: Velonski/mytetra-database/master/base/1525714200s4su947w3i/text.html на raw.githubusercontent.com
|
|||||||
|
|||||||
чала клиентсная часть <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>WebSocket test page</title> </head> <body onload="create();"> <script type="text/javascript"> function create() { // Example ws = new WebSocket('ws://'+document.domain+':8081/'); ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';} ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';} ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';} } </script> <button onclick="create();">Create WebSocket</button> <button onclick="ws.send('ping');">Send ping</button> <button onclick="ws.close();">Close WebSocket</button> <div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div> </body> </html> В моей игре мне пришлось использовать 3 сокет сервера. Для websocket, для worker`ов и для longpooling. В игре очень много математики, поэтому надо было делать вёркеры и выдавать им задачи на вычисления. Так вот к чему это. Что stream_select для них всех должен быть общий, иначе будут лаги или безумное использование процессора. Это знание тоже было получено взамен кучи истраченных нервов. Основной цикл сервиса $master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr); if (!$master) die("$errstr ($errno)\n"); $sockets = array($master); stream_set_blocking($master, false); // Относительно этой команды я не уверен, потому что мастер из сокетов читает только новые соединения, и для чтения используется "stream_socket_accept". Вариант, что весь сервис будет подвешен на несколько секунд из-за того, что клиент не торопится соединятся - категорически неприемлемо. while (true) { $read = $sockets; $write = $except = array(); if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) { var_dump('stream_select error'); break; // Сделать выход из цикла, а не "die", потому что в продакшине скорей всего этот код будет выполняться как сервис и при команде "/etc/init.d/game restart" тут 100% будет этот case, так вот надо дать "pcntl" код нормально отработать и не мешать ему. } foreach ($read as $socket) { $index_socket = array_search($socket, $sockets); if ($index_socket == 0) { // Новое соединение continue; } // Тут будет обработка сообщений клиентов } } Соединение с новыми клиентами вполне себе стандартный код, но вот из-за того, что сокеты у нас в неблокирующем режиме, нужно написать кучу кода, который по кусочкам соберёт все входящие данные и, когда данных будет достаточно, обработает их, поймёт какой протокол надо использовать и переключится на использование этого протокола. Одна эта задача — уже гора кода, и в PhpDeamon нагородили много кода, который к WebSocket отношения не имеет (они же там 8 разных серверов умеют подымать). Удалось многое отрезать и упростить в этой теме. Оставил только то, что относится к WebSocket. Файл урезанный <ws.php> class ws { const MAX_BUFFER_SIZE = 1024 * 1024; protected $socket; /** * @var array _SERVER */ public $server = []; protected $headers = []; protected $closed = false; protected $unparsed_data = ''; private $current_header; private $unread_lines = array(); protected $extensions = []; protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS'; /** * @var integer Current state */ protected $state = 0; // stream state of the connection (application protocol level) /** * Alias of STATE_STANDBY */ const STATE_ROOT = 0; /** * Standby state (default state) */ const STATE_STANDBY = 0; /** * State: first line */ const STATE_FIRSTLINE = 1; /** * State: headers */ const STATE_HEADERS = 2; /** * State: content */ const STATE_CONTENT = 3; /** * State: prehandshake */ const STATE_PREHANDSHAKE = 5; /** * State: handshaked */ const STATE_HANDSHAKED = 6; public function get_state() { return $this->state; } public function closed() { return $this->closed; } protected function close() { if ($this->closed) return; var_dump('self close'); fclose($this->socket); $this->closed = true; } public function __construct($socket) { stream_set_blocking($socket, false); $this->socket = $socket; } private function read_line() { $lines = explode(PHP_EOL, $this->unparsed_data); $last_line = $lines[count($lines)-1]; unset($lines[count($lines) - 1]); foreach ($lines as $line) { $this->unread_lines[] = $line; } $this->unparsed_data = $last_line; if (count($this->unread_lines) != 0) { return array_shift($this->unread_lines); } else { return null; } } public function on_receive_data() { if ($this->closed) return; $data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE); if (is_string($data)) { $this->unparsed_data .= $data; } } /** * Called when new data received. * @return void */ public function on_read() { if ($this->closed) return; if ($this->state === self::STATE_STANDBY) { $this->state = self::STATE_FIRSTLINE; } if ($this->state === self::STATE_FIRSTLINE) { if (!$this->http_read_first_line()) { return; } $this->state = self::STATE_HEADERS; } if ($this->state === self::STATE_HEADERS) { if (!$this->http_read_headers()) { return; } if (!$this->http_process_headers()) { $this->close(); return; } $this->state = self::STATE_CONTENT; } if ($this->state === self::STATE_CONTENT) { $this->state = self::STATE_PREHANDSHAKE; } } /** * Read first line of HTTP request * @return boolean|null Success */ protected function http_read_first_line() { if (($l = $this->read_line()) === null) { return null; } $e = explode(' ', $l); $u = isset($e[1]) ? parse_url($e[1]) : false; if ($u === false) { $this->bad_request(); return false; } if (!isset($u['path'])) { $u['path'] = null; } if (isset($u['host'])) { $this->server['HTTP_HOST'] = $u['host']; } $srv = & $this->server; $srv['REQUEST_METHOD'] = $e[0]; $srv['REQUEST_TIME'] = time(); $srv['REQUEST_TIME_FLOAT'] = microtime(true); $srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : ''); $srv['DOCUMENT_URI'] = $u['path']; $srv['PHP_SELF'] = $u['path']; $srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null; $srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/'; $srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1'; $srv['REMOTE_ADDR'] = null; $srv['REMOTE_PORT'] = null; return true; } /** * Read headers line-by-line * @return boolean|null Success */ protected function http_read_headers() { while (($l = $this->read_line()) !== null) { if ($l === '') { return true; } $e = explode(': ', $l); if (isset($e[1])) { $this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_'])); $this->server[$this->current_header] = $e[1]; } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) { // multiline header continued $this->server[$this->current_header] .= $e[0]; } else { // whatever client speaks is not HTTP anymore $this->bad_request(); return false; } } } /** * Process headers * @return bool */ protected function http_process_headers() { $this->state = self::STATE_PREHANDSHAKE; if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) { $str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']); $str = preg_replace($this->extensionsCleanRegex, '', $str); $this->extensions = explode(', ', $str); } if (!isset($this->server['HTTP_CONNECTION']) || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade") || !isset($this->server['HTTP_UPGRADE']) || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important ) { $this->close(); return false; } if (isset($this->server['HTTP_COOKIE'])) { self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie); } if (isset($this->server['QUERY_STRING'])) { self::parse_str($this->server['QUERY_STRING'], $this->get); } // ---------------------------------------------------------- // Protocol discovery, based on HTTP headers... // ---------------------------------------------------------- if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14) $this->switch_to_protocol('v13'); } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol $this->switch_to_protocol('v13'); } else { error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr $this->close(); return false; } } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) { $this->switch_to_protocol('ve'); } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...) $this->switch_to_protocol('v0'); } // ---------------------------------------------------------- // End of protocol discovery // ---------------------------------------------------------- return true; } private function switch_to_protocol($protocol) { $class = 'ws_'.$protocol; $this->new_instance = new $class($this->socket); $this->new_instance->state = $this->state; $this->new_instance->unparsed_data = $this->unparsed_data; $this->new_instance->server = $this->server; } /** * Send Bad request * @return void */ public function bad_request() { $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>"); $this->close(); } /** * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX * @param string $s String to parse * @param array &$var Reference to the resulting array * @param boolean $header Header-style string * @return void */ public static function parse_str($s, &$var, $header = false) { static $cb; if ($cb === null) { $cb = function ($m) { return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8')); }; } if ($header) { $s = strtr($s, self::$hvaltr); } if ( (stripos($s, '%u') !== false) && preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m) ) { $s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s); } parse_str($s, $var); } /** * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop * @param string $data Data to send * @return boolean Success */ public function write($data) { if ($this->closed) return false; return stream_socket_sendto($this->socket, $data) == 0; } } Смысл этого класса в таком урезанном виде — в конструкторе установить неблокирующий режим для соединения с клиентом. Далее в основном цикле, каждый раз, когда приходят данные — сразу их прочитать и положить (дополнить) в «unparsed_data» переменную (это метод on_receive_data). Важно понимать, что если мы выйдем за размеры MAX_BUFFER_SIZE, то вообще ничего страшного не случится. Можно в итоговом примере, что тут будет, поставить его значение, скажем, «5» и убедится, что всё по-прежнему работает. Просто данные из буфера на первом шаге будут проигнорированы — они ведь неполные будут, и со второго или пятого или сотого захода наберутся, наконец, все принятые данные и будут обработаны. При этом stream_select в основном цикле ждать не будет даже микросекунды, пока все данные не будут извлечены. Константу надо подобрать такую, чтобы 95% ожидаемых данных читались целиком. Далее в основном цикле (после получения очередной порции данных) мы пробуем накопленные данные обработать (это метод on_read). В классе «ws» метод «on_read» состоит по сути из трёх шагов: «читаем первую строку и готовим переменные окружения», «читаем все заголовки», «обрабатываем все заголовки». Первые 2 пояснять не надо, но написаны они довольно объёмно потому, что мы в неблокирующем режиме и надо быть готовым к тому, что данные оборваны в любом месте. Обработка заголовков сначала проверяет формат запроса правильный или нет, а потом по заголовкам определяет протокол, по которому будет общаться с клиентом. В итоге должны дёрнуть метод switch_to_protocol. Этот метод внутри себя сформирует экземпляр класса «ws_<протокол>» и подготовит его для отдачи в основной цикл. В основном цикле далее надо собственно проверить: а не надо ли подменить объект (если кто-то может предложить реализацию этого места лучше — всегда пожалуйста). Далее в основном цикле надо поставить проверку: а не закрыт ли сокет. Если закрыт, то очистить память (об этом дальнее в следующем блоке). Теперь полная версия файла <deamon.php> require('ws.php'); require('ws_v0.php'); require('ws_v13.php'); require('ws_ve.php'); $master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr); if (!$master) die("$errstr ($errno)\n"); $sockets = array($master); /** * @var ws[] $connections */ $connections = array(); stream_set_blocking($master, false); /** * @param ws $connection * @param $data * @param $type */ $my_callback = function($connection, $data, $type) { var_dump('my ws data: ['.$data.'/'.$type.']'); $connection->send_frame('test '.time()); }; while (true) { $read = $sockets; $write = $except = array(); if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) { var_dump('stream_select error'); break; } foreach ($read as $socket) { $index_socket = array_search($socket, $sockets); if ($index_socket == 0) { // Новое соединение if ($socket_new = stream_socket_accept($master, -1)) { $connection = new ws($socket_new, $my_callback); $sockets[] = $socket_new; $index_new_socket = array_search($socket_new, $sockets); $connections[$index_new_socket] = &$connection; $index_socket = $index_new_socket; } else { // Я так и не понял что в этом случае надо делать error_log('stream_socket_accept'); var_dump('error stream_socket_accept'); continue; } } $connection = &$connections[$index_socket]; $connection->on_receive_data(); $connection->on_read(); if ($connection->get_state() == ws::STATE_PREHANDSHAKE) { $connection = $connection->get_new_instance(); $connections[$index_socket] = &$connection; $connection->on_read(); } if ($connection->closed()) { unset($sockets[$index_socket]); unset($connections[$index_socket]); unset($connection); var_dump('close '.$index_socket); } } } Тут добавлен "$my_callback" — это наш custom обработчик сообщений от клиента. Разумеется в продакшине можно завернуть это всё в объекты всякие, а тут чтобы было понятнее просто переменная-функция. О ней чуть позже подробнее. Реализована обработка нового соединения и реализовано основное тело цикла, о котором я чуть выше писал. Я хочу обратить внимание на код сервера тут. Что если прочтённые данные из сокета — это пустая строка (да, разумеется я видел там в update проверку на пустую строку), то сокет надо закрыть. Ох, я даже не знаю, сколько этот момет попил мне кровушки и скольких пользователей я потерял. Внезапнейшим образом Сафари отправляет пустую строку и считает это нормой, а этот код берёт и закрывает соединение пользователю. Яндекс-браузер иногда ведёт себя так же. Уж не знаю почему, но в этом случае для Сафари WebSocket остаётся зависшим, то есть он не закрывается, не открывается — просто висит и всё. Вы уже заметили, что я неравнодушен к этому волшебному браузеру? Мне вспоминается, как я верстал под IE6 — примерно такие же ощущения. Теперь о том, зачем я использую array_search и синхронизирую массив $sockets и массив $connections. Дело в том, что stream_select жизненно необходим чистый массив $sockets и никак иначе. Но как-то надо же связать конкретный сокет из массива $sockets с объектом «ws». Перепробовал кучу вариантов — в итоге остановился на таком варианте, что есть 2 массива, которые постоянно синхронизированы по ключам. В одном массиве неоходимые чистые сокеты для stream_select, а во втором экземпляры класса «ws» или его наследники. Если кто-то может предложить это место лучше — предлагайте. Ещё отдельно надо отметить случай, когда stream_socket_accept зафэйлился. Я так понимаю, теоретически это может быть только в том случае, если мастер сокет у нас в неблокирующем режиме, и приехало недостаточно данных для соединения клиента. Поэтому просто ничего не делаем. Полная версия файла <ws.php> class ws { private static $hvaltr = ['; ' => '&', ';' => '&', ' ' => '%20']; const maxAllowedPacket = 1024 * 1024 * 1024; const MAX_BUFFER_SIZE = 1024 * 1024; protected $socket; /** * @var array _SERVER */ public $server = []; protected $on_frame_user = null; protected $handshaked = false; protected $headers = []; protected $headers_sent = false; protected $closed = false; protected $unparsed_data = ''; private $current_header; private $unread_lines = array(); /** * @var ws|null */ private $new_instance = null; protected $extensions = []; protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS'; /** * @var integer Current state */ protected $state = 0; // stream state of the connection (application protocol level) /** * Alias of STATE_STANDBY */ const STATE_ROOT = 0; /** * Standby state (default state) */ const STATE_STANDBY = 0; /** * State: first line */ const STATE_FIRSTLINE = 1; /** * State: headers */ const STATE_HEADERS = 2; /** * State: content */ const STATE_CONTENT = 3; /** * State: prehandshake */ const STATE_PREHANDSHAKE = 5; /** * State: handshaked */ const STATE_HANDSHAKED = 6; public function get_state() { return $this->state; } public function get_new_instance() { return $this->new_instance; } public function closed() { return $this->closed; } protected function close() { if ($this->closed) return; var_dump('self close'); fclose($this->socket); $this->closed = true; } public function __construct($socket, $on_frame_user = null) { stream_set_blocking($socket, false); $this->socket = $socket; $this->on_frame_user = $on_frame_user; } private function read_line() { $lines = explode(PHP_EOL, $this->unparsed_data); $last_line = $lines[count($lines)-1]; unset($lines[count($lines) - 1]); foreach ($lines as $line) { $this->unread_lines[] = $line; } $this->unparsed_data = $last_line; if (count($this->unread_lines) != 0) { return array_shift($this->unread_lines); } else { return null; } } public function on_receive_data() { if ($this->closed) return; $data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE); if (is_string($data)) { $this->unparsed_data .= $data; } } /** * Called when new data received. * @return void */ public function on_read() { if ($this->closed) return; if ($this->state === self::STATE_STANDBY) { $this->state = self::STATE_FIRSTLINE; } if ($this->state === self::STATE_FIRSTLINE) { if (!$this->http_read_first_line()) { return; } $this->state = self::STATE_HEADERS; } if ($this->state === self::STATE_HEADERS) { if (!$this->http_read_headers()) { return; } if (!$this->http_process_headers()) { $this->close(); return; } $this->state = self::STATE_CONTENT; } if ($this->state === self::STATE_CONTENT) { $this->state = self::STATE_PREHANDSHAKE; } } /** * Read first line of HTTP request * @return boolean|null Success */ protected function http_read_first_line() { if (($l = $this->read_line()) === null) { return null; } $e = explode(' ', $l); $u = isset($e[1]) ? parse_url($e[1]) : false; if ($u === false) { $this->bad_request(); return false; } if (!isset($u['path'])) { $u['path'] = null; } if (isset($u['host'])) { $this->server['HTTP_HOST'] = $u['host']; } $address = explode(':', stream_socket_get_name($this->socket, true)); //получаем адрес клиента $srv = & $this->server; $srv['REQUEST_METHOD'] = $e[0]; $srv['REQUEST_TIME'] = time(); $srv['REQUEST_TIME_FLOAT'] = microtime(true); $srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : ''); $srv['DOCUMENT_URI'] = $u['path']; $srv['PHP_SELF'] = $u['path']; $srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null; $srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/'; $srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1'; $srv['REMOTE_ADDR'] = $address[0]; $srv['REMOTE_PORT'] = $address[1]; return true; } /** * Read headers line-by-line * @return boolean|null Success */ protected function http_read_headers() { while (($l = $this->read_line()) !== null) { if ($l === '') { return true; } $e = explode(': ', $l); if (isset($e[1])) { $this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_'])); $this->server[$this->current_header] = $e[1]; } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) { // multiline header continued $this->server[$this->current_header] .= $e[0]; } else { // whatever client speaks is not HTTP anymore $this->bad_request(); return false; } } } /** * Process headers * @return bool */ protected function http_process_headers() { $this->state = self::STATE_PREHANDSHAKE; if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) { $str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']); $str = preg_replace($this->extensionsCleanRegex, '', $str); $this->extensions = explode(', ', $str); } if (!isset($this->server['HTTP_CONNECTION']) || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade") || !isset($this->server['HTTP_UPGRADE']) || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important ) { $this->close(); return false; } /* if (isset($this->server['HTTP_COOKIE'])) { self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie); } if (isset($this->server['QUERY_STRING'])) { self::parse_str($this->server['QUERY_STRING'], $this->get); } */ // ---------------------------------------------------------- // Protocol discovery, based on HTTP headers... // ---------------------------------------------------------- if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14) $this->switch_to_protocol('v13'); } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol $this->switch_to_protocol('v13'); } else { error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr $this->close(); return false; } } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) { $this->switch_to_protocol('ve'); } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...) $this->switch_to_protocol('v0'); } // ---------------------------------------------------------- // End of protocol discovery // ---------------------------------------------------------- return true; } private function switch_to_protocol($protocol) { $class = 'ws_'.$protocol; $this->new_instance = new $class($this->socket); $this->new_instance->state = $this->state; $this->new_instance->unparsed_data = $this->unparsed_data; $this->new_instance->server = $this->server; $this->new_instance->on_frame_user = $this->on_frame_user; } /** * Send Bad request * @return void */ public function bad_request() { $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>"); $this->close(); } /** * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX * @param string $s String to parse * @param array &$var Reference to the resulting array * @param boolean $header Header-style string * @return void */ public static function parse_str($s, &$var, $header = false) { static $cb; if ($cb === null) { $cb = function ($m) { return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8')); }; } if ($header) { $s = strtr($s, self::$hvaltr); } if ( (stripos($s, '%u') !== false) && preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m) ) { $s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s); } parse_str($s, $var); } /** * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop * @param string $data Data to send * @return boolean Success */ public function write($data) { if ($this->closed) return false; return stream_socket_sendto($this->socket, $data) == 0; } /** * Будте любезны в отнаследованном классе реализовать этот метод * @return bool */ protected function send_handshake_reply() { return false; } /** * Called when we're going to handshake. * @return boolean Handshake status */ public function handshake() { $extra_headers = ''; foreach ($this->headers as $k => $line) { if ($k !== 'STATUS') { $extra_headers .= $line . "\r\n"; } } if (!$this->send_handshake_reply($extra_headers)) { error_log(get_class($this) . '::' . __METHOD__ . ' : Handshake protocol failure for client ""'); // $this->addr $this->close(); return false; } $this->handshaked = true; $this->headers_sent = true; $this->state = static::STATE_HANDSHAKED; return true; } /** * Read from buffer without draining * @param integer $n Number of bytes to read * @param integer $o Offset * @return string|false */ public function look($n, $o = 0) { if (strlen($this->unparsed_data) <= $o) { return ''; } return substr($this->unparsed_data, $o, $n); } /** * Convert bytes into integer * @param string $str Bytes * @param boolean $l Little endian? Default is false * @return integer */ public static function bytes2int($str, $l = false) { if ($l) { $str = strrev($str); } $dec = 0; $len = strlen($str); for ($i = 0; $i < $len; ++$i) { $dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1); } return $dec; } /** * Drains buffer * @param integer $n Numbers of bytes to drain * @return boolean Success */ public function drain($n) { $ret = substr($this->unparsed_data, 0, $n); $this->unparsed_data = substr($this->unparsed_data, $n); return $ret; } /** * Read data from the connection's buffer * @param integer $n Max. number of bytes to read * @return string|false Readed data */ public function read($n) { if ($n <= 0) { return ''; } $read = $this->drain($n); if ($read === '') { return false; } return $read; } /** * Reads all data from the connection's buffer * @return string Readed data */ public function read_unlimited() { $ret = $this->unparsed_data; $this->unparsed_data = ''; return $ret; } /** * Searches first occurence of the string in input buffer * @param string $what Needle * @param integer $start Offset start * @param integer $end Offset end * @return integer Position */ public function search($what, $start = 0, $end = -1) { return strpos($this->unparsed_data, $what, $start); } /** * Called when new frame received. * @param string $data Frame's data. * @param string $type Frame's type ("STRING" OR "BINARY"). * @return boolean Success. */ public function on_frame($data, $type) { if (is_callable($this->on_frame_user)) { call_user_func($this->on_frame_user, $this, $data, $type); } return true; } public function send_frame($data, $type = null, $cb = null) { return false; } /** * Get real frame type identificator * @param $type * @return integer */ public function get_frame_type($type) { if (is_int($type)) { return $type; } if ($type === null) { $type = 'STRING'; } $frametype = @constant(get_class($this) . '::' . $type); if ($frametype === null) { error_log(__METHOD__ . ' : Undefined frametype "' . $type . '"'); } return $frametype; } } По сути тут добавлены 3 вещи: «соединение с клиентом на уровне веб сокета», «получение сообщения от клиента», «отправка сообщения клиенту». Для начала немного теории и терминологии. «Handshake» — это с точки зрения веб сокетов процедура установления соединения поверх http. Надо ведь решить кучу вопросов: как пробиться сквозь гущу прокси и кэшэй, как защитится от злых хакеров. И термин «frame» — это кусок данных в расшифрованном виде, это сообщение от клиента или сообщение для клиента. Возможно, об этом стоило написать в начале статьи, но из-за этих вот «frame» делать сокет сервер в блокирующем режиме сокетов имхо бессмысленно. То, как сделан этот момент вот тут — это лишило меня сна не на одну ночь. В той статье не рассматривается вариант, что frame приехал не полностью или их приехало сразу два. И то и то, между прочим, вполне себе типичная ситуация, как показали логи игры. Теперь к деталям. Соединение с клиентом на уровне веб сокета — предполагается, что протокол (например, ws_v0) перекроет метод «on_read» и внутри себя дёрнет «handshake», когда данных будет достаточно. Далее кусок «handshake» в родителе. Далее дёргается метод «send_handshake_reply», который должен быть реализован в протоколе. Этот вот «send_handshake_reply» должен такое ответить клиенту, чтобы тот понял, что «соединение установлено», нормальным браузерам — нормальный ответ, а для Сафари — особый ответ. Получение сообщения от клиента. Обращаю внимание, что глупые клиенты могут реализовать такой вариант, что соединение не установлено, а сообщение от пользователя уже пришло. Поэтому надо бережно относится к «unparsed_data» переменной. В каждом протоколе метод «on_read» должен понять размер передаваемого frame, убедиться, что frame целиком приехал, расшифровать приехавший frame в сообщение пользователя. В каждом протоколе это делается очень по-разному и очень кучеряво (мы ж не знаем, приехал frame полностью или нет, плюс нельзя откусить ни байта следующего frame). Далее внутри «on_read», когда данные клиента получены и расшифрованы и определён их тип (да-да и такое предусмотрено), дёргаем метод «on_frame», который внутри класса «ws», тот, в свою очередь, дёрнет custom callback (функция $my_callback, перед основным циклом которая). И в итоге $my_callback получает сообщение от клиента. Отправка сообщения клиенту. Просто дёргается метод «send_frame», который должен быть реализован внутри протокола. Тут просто шифруем сообщение и отправляем пользователю. Разные протоколы шифруют по-разному. Теперь прилагаю 3 протокола «v13», «v0», «ve»: Файл <ws_v13.php> class ws_v13 extends ws { const CONTINUATION = 0; const STRING = 0x1; const BINARY = 0x2; const CONNCLOSE = 0x8; const PING = 0x9; const PONG = 0xA; protected static $opcodes = [ 0 => 'CONTINUATION', 0x1 => 'STRING', 0x2 => 'BINARY', 0x8 => 'CONNCLOSE', 0x9 => 'PING', 0xA => 'PONG', ]; protected $outgoingCompression = 0; protected $framebuf = ''; /** * Apply mask * @param $data * @param string|false $mask * @return mixed */ public function mask($data, $mask) { for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) { $data[$i] = $data[$i] ^ $mask[$i % $ml]; } return $data; } /** * Sends a frame. * @param string $data Frame's data. * @param string $type Frame's type. ("STRING" OR "BINARY") * @param callable $cb Optional. Callback called when the frame is received by client. * @callback $cb ( ) * @return boolean Success. */ public function send_frame($data, $type = null, $cb = null) { if (!$this->handshaked) { return false; } if ($this->closed && $type !== 'CONNCLOSE') { return false; } /*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) { //$data = gzcompress($data, $this->outgoingCompression); //$rsv1 = 1; }*/ $fin = 1; $rsv1 = 0; $rsv2 = 0; $rsv3 = 0; $this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT)))); $dataLength = strlen($data); $isMasked = false; $isMaskedInt = $isMasked ? 128 : 0; if ($dataLength <= 125) { $this->write(chr($dataLength + $isMaskedInt)); } elseif ($dataLength <= 65535) { $this->write(chr(126 + $isMaskedInt) . // 126 + 128 chr($dataLength >> 8) . chr($dataLength & 0xFF)); } else { $this->write(chr(127 + $isMaskedInt) . // 127 + 128 chr($dataLength >> 56) . chr($dataLength >> 48) . chr($dataLength >> 40) . chr($dataLength >> 32) . chr($dataLength >> 24) . chr($dataLength >> 16) . chr($dataLength >> 8) . chr($dataLength & 0xFF)); } if ($isMasked) { $mask = chr(mt_rand(0, 0xFF)) . chr(mt_rand(0, 0xFF)) . chr(mt_rand(0, 0xFF)) . chr(mt_rand(0, 0xFF)); $this->write($mask . $this->mask($data, $mask)); } else { $this->write($data); } if ($cb !== null) { $cb(); } return true; } /** * Sends a handshake message reply * @param string Received data (no use in this class) * @return boolean OK? */ public function send_handshake_reply($extraHeaders = '') { if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { return false; } if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') { return false; } if (isset($this->server['HTTP_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN']; } if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ''; } $this->write("HTTP/1.1 101 Switching Protocols\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Date: " . date('r') . "\r\n" . "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "\r\n" . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n" . "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "\r\n" ); if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) { $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n"); } $this->write($extraHeaders."\r\n"); return true; } /** * Called when new data received * @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16 * @return void */ public function on_read() { if ($this->closed) return; if ($this->state === self::STATE_PREHANDSHAKE) { if (!$this->handshake()) { return; } } if ($this->state === self::STATE_HANDSHAKED) { while (($buflen = strlen($this->unparsed_data)) >= 2) { $first = ord($this->look(1)); // first byte integer (fin, opcode) $firstBits = decbin($first); $opcode = (int)bindec(substr($firstBits, 4, 4)); if ($opcode === 0x8) { // CLOSE $this->close(); return; } $opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false; if (!$opcodeName) { error_log(get_class($this) . ': Undefined opcode ' . $opcode); $this->close(); return; } $second = ord($this->look(1, 1)); // second byte integer (masked, payload length) $fin = (bool)($first >> 7); $isMasked = (bool)($second >> 7); $dataLength = $second & 0x7f; $p = 2; if ($dataLength === 0x7e) { // 2 bytes-length if ($buflen < $p + 2) { return; // not enough data yet } $dataLength = self::bytes2int($this->look(2, $p), false); $p += 2; } elseif ($dataLength === 0x7f) { // 8 bytes-length if ($buflen < $p + 8) { return; // not enough data yet } $dataLength = self::bytes2int($this->look(8, $p)); $p += 8; } if (self::maxAllowedPacket <= $dataLength) { // Too big packet $this->close(); return; } if ($isMasked) { if ($buflen < $p + 4) { return; // not enough data yet } $mask = $this->look(4, $p); $p += 4; } if ($buflen < $p + $dataLength) { return; // not enough data yet } $this->drain($p); $data = $this->read($dataLength); if ($isMasked) { $data = $this->mask($data, $mask); } //Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data)))); /*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame $data = gzuncompress($data, $this->pool->maxAllowedPacket); }*/ if (!$fin) { $this->framebuf .= $data; } else { $this->on_frame($this->framebuf . $data, $opcodeName); $this->framebuf = ''; } } } } } Файл <ws_v0.php> class ws_v0 extends ws { const STRING = 0x00; const BINARY = 0x80; protected $key; /** * Sends a handshake message reply * @param string Received data (no use in this class) * @return boolean OK? */ public function send_handshake_reply($extraHeaders = '') { if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) { return false; } $final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key); $this->key = null; if (!$final_key) { return false; } if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ''; } $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n" . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"); if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) { $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n"); } $this->write($extraHeaders . "\r\n" . $final_key); return true; } /** * Computes final key for Sec-WebSocket. * @param string Key1 * @param string Key2 * @param string Data * @return string Result */ protected function _computeFinalKey($key1, $key2, $data) { if (strlen($data) < 8) { error_log(get_class($this) . '::' . __METHOD__ . ' : Invalid handshake data for client ""'); // $this->addr return false; } return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true); } /** * Computes key for Sec-WebSocket. * @param string Key * @return string Result */ protected function _computeKey($key) { $spaces = 0; $digits = ''; for ($i = 0, $s = strlen($key); $i < $s; ++$i) { $c = substr($key, $i, 1); if ($c === "\x20") { ++$spaces; } elseif (ctype_digit($c)) { $digits .= $c; } } if ($spaces > 0) { $result = (float)floor($digits / $spaces); } else { $result = (float)$digits; } return pack('N', $result); } /** * Sends a frame. * @param string $data Frame's data. * @param string $type Frame's type. ("STRING" OR "BINARY") * @param callable $cb Optional. Callback called when the frame is received by client. * @callback $cb ( ) * @return boolean Success. */ public function send_frame($data, $type = null, $cb = null) { if (!$this->handshaked) { return false; } if ($this->closed && $type !== 'CONNCLOSE') { return false; } if ($type === 'CONNCLOSE') { if ($cb !== null) { $cb($this); return true; } } $type = $this->get_frame_type($type); // Binary if (($type & self::BINARY) === self::BINARY) { $n = strlen($data); $len = ''; $pos = 0; char: ++$pos; $c = $n >> 0 & 0x7F; $n >>= 7; if ($pos !== 1) { $c += 0x80; } if ($c !== 0x80) { $len = chr($c) . $len; goto char; }; $this->write(chr(self::BINARY) . $len . $data); } // String else { $this->write(chr(self::STRING) . $data . "\xFF"); } if ($cb !== null) { $cb(); } return true; } /** * Called when new data received * @return void */ public function on_read() { if ($this->state === self::STATE_PREHANDSHAKE) { if (strlen($this->unparsed_data) < 8) { return; } $this->key = $this->read_unlimited(); $this->handshake(); } if ($this->state === self::STATE_HANDSHAKED) { while (($buflen = strlen($this->unparsed_data)) >= 2) { $hdr = $this->look(10); $frametype = ord(substr($hdr, 0, 1)); if (($frametype & 0x80) === 0x80) { $len = 0; $i = 0; do { if ($buflen < $i + 1) { // not enough data yet return; } $b = ord(substr($hdr, ++$i, 1)); $n = $b & 0x7F; $len *= 0x80; $len += $n; } while ($b > 0x80); if (self::maxAllowedPacket <= $len) { // Too big packet $this->close(); return; } if ($buflen < $len + $i + 1) { // not enough data yet return; } $this->drain($i + 1); $this->on_frame($this->read($len), 'BINARY'); } else { if (($p = $this->search("\xFF")) !== false) { if (self::maxAllowedPacket <= $p - 1) { // Too big packet $this->close(); return; } $this->drain(1); $data = $this->read($p); $this->drain(1); $this->on_frame($data, 'STRING'); } else { if (self::maxAllowedPacket < $buflen - 1) { // Too big packet $this->close(); return; } // not enough data yet return; } } } } } } Файл <ws_ve.php> class ws_ve extends ws { const STRING = 0x00; const BINARY = 0x80; /** * Sends a handshake message reply * @param string Received data (no use in this class) * @return boolean OK? */ public function send_handshake_reply($extraHeaders = '') { if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) { $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ''; } $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n" . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n" ); if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) { $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n"); } $this->write($extraHeaders."\r\n"); return true; } /** * Computes key for Sec-WebSocket. * @param string Key * @return string Result */ protected function _computeKey($key) { $spaces = 0; $digits = ''; for ($i = 0, $s = strlen($key); $i < $s; ++$i) { $c = substr($key, $i, 1); if ($c === "\x20") { ++$spaces; } elseif (ctype_digit($c)) { $digits .= $c; } } if ($spaces > 0) { $result = (float)floor($digits / $spaces); } else { $result = (float)$digits; } return pack('N', $result); } /** * Sends a frame. * @param string $data Frame's data. * @param string $type Frame's type. ("STRING" OR "BINARY") * @param callable $cb Optional. Callback called when the frame is received by client. * @callback $cb ( ) * @return boolean Success. */ public function send_frame($data, $type = null, $cb = null) { if (!$this->handshaked) { return false; } if ($this->closed && $type !== 'CONNCLOSE') { return false; } if ($type === 'CONNCLOSE') { if ($cb !== null) { $cb($this); return true; } } // Binary $type = $this->get_frame_type($type); if (($type & self::BINARY) === self::BINARY) { $n = strlen($data); $len = ''; $pos = 0; char: ++$pos; $c = $n >> 0 & 0x7F; $n >>= 7; if ($pos !== 1) { $c += 0x80; } if ($c !== 0x80) { $len = chr($c) . $len; goto char; }; $this->write(chr(self::BINARY) . $len . $data); } // String else { $this->write(chr(self::STRING) . $data . "\xFF"); } if ($cb !== null) { $cb(); } return true; } /** * Called when new data received * @return void */ public function on_read() { while (($buflen = strlen($this->unparsed_data)) >= 2) { $hdr = $this->look(10); $frametype = ord(substr($hdr, 0, 1)); if (($frametype & 0x80) === 0x80) { $len = 0; $i = 0; do { if ($buflen < $i + 1) { return; } $b = ord(substr($hdr, ++$i, 1)); $n = $b & 0x7F; $len *= 0x80; $len += $n; } while ($b > 0x80); if (self::maxAllowedPacket <= $len) { // Too big packet $this->close(); return; } if ($buflen < $len + $i + 1) { // not enough data yet return; } $this->drain($i + 1); $this->on_frame($this->read($len), $frametype); } else { if (($p = $this->search("\xFF")) !== false) { if (self::maxAllowedPacket <= $p - 1) { // Too big packet $this->close(); return; } $this->drain(1); $data = $this->read($p); $this->drain(1); $this->on_frame($data, 'STRING'); } else { if (self::maxAllowedPacket < $buflen - 1) { // Too big packet $this->close(); return; } } } } } } Сразу хочу отметить, что протокол VE не тестировал — понятия не имею кто его использует. Но добросовестно сконвертировал и урезал код из PhpDeamon. Протокол V13 используют все нормальные браузеры (FireFox, Opera, Chrome, Яндекс). Даже IE его использует (извините, после IE6 — для меня IE никогда не будет «браузером», даже команда разработчик IE заявляли, что это «не браузер, а тонкий клиент»). Протокол V0 использует браузер «Сафари». Вместо заключения Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon». В итоге сокет сервер, приведенный тут, совместим со всеми современными браузерами. Работает без утечек памяти и годится для использования в высоконагруженных системах. Обновление: Комментарий автора статьи, на которую я постоянно ссылаюсь в тексте: habrahabr.ru/post/301822/#comment_9634636 Метод read_lint() содержит ошибку — что мы читаем данные тела http запроса, хотя должны были читать только заголовки. В основном теле цикла — не корректное использование указателей при переключении протокола. По просьбам трудящихся вот ссылка на gitbub github.com/anlide/websocket тут код исправленный и ещё ping-pong доработанный, осталось ещё причину закрытия сокета фиксировать и заменить select на что-то — и будет замечательная смесь лучших серверных решений по websocket. |
|||||||
Так же в этом разделе:
|
|||||||
|
|||||||
|