|
|||||||
Делаем вебсокеты на PHP с нуля
Время создания: 07.05.2018 22:31
Автор: https://habr.com/users/morozovsk/
Текстовые метки: websocket
Раздел: WebSocket
Запись: Velonski/mytetra-database/master/base/1525714312lwx7t6o5s5/text.html на raw.githubusercontent.com
|
|||||||
|
|||||||
чата #!/usr/bin/env php <?php class WebsocketServer { public function __construct($config) { $this->config = $config; } public function start() { //открываем серверный сокет $server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errorNumber, $errorString); if (!$server) { die("error: stream_socket_server: $errorString ($errorNumber)\r\n"); } list($pid, $master, $workers) = $this->spawnWorkers();//создаём дочерние процессы if ($pid) {//мастер fclose($server);//мастер не будет обрабатывать входящие соединения на основном сокете $WebsocketMaster = new WebsocketMaster($workers);//мастер будет пересылать сообщения между воркерами $WebsocketMaster->start(); } else {//воркер $WebsocketHandler = new WebsocketHandler($server, $master); $WebsocketHandler->start(); } } protected function spawnWorkers() { $master = null; $workers = array(); $i = 0; while ($i < $this->config['workers']) { $i++; //создаём парные сокеты, через них будут связываться мастер и воркер $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); $pid = pcntl_fork();//создаём форк if ($pid == -1) { die("error: pcntl_fork\r\n"); } elseif ($pid) { //мастер fclose($pair[0]); $workers[$pid] = $pair[1];//один из пары будет в мастере } else { //воркер fclose($pair[1]); $master = $pair[0];//второй в воркере break; } } return array($pid, $master, $workers); } } class WebsocketMaster { protected $workers = array(); protected $clients = array(); public function __construct($workers) { $this->clients = $this->workers = $workers; } public function start() { while (true) { //подготавливаем массив всех сокетов, которые нужно обработать $read = $this->clients; stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать if ($read) {//пришли данные от подключенных клиентов foreach ($read as $client) { $data = fread($client, 1000); if (!$data) { //соединение было закрыто unset($this->clients[intval($client)]); @fclose($client); continue; } foreach ($this->workers as $worker) {//пересылаем данные во все воркеры if ($worker !== $client) { fwrite($worker, $data); } } } } } } } abstract class WebsocketWorker { protected $clients = array(); protected $server; protected $master; protected $pid; protected $handshakes = array(); protected $ips = array(); public function __construct($server, $master) { $this->server = $server; $this->master = $master; $this->pid = posix_getpid(); } public function start() { while (true) { //подготавливаем массив всех сокетов, которые нужно обработать $read = $this->clients; $read[] = $this->server; $read[] = $this->master; $write = array(); if ($this->handshakes) { foreach ($this->handshakes as $clientId => $clientInfo) { if ($clientInfo) { $write[] = $this->clients[$clientId]; } } } stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать if (in_array($this->server, $read)) { //на серверный сокет пришёл запрос от нового клиента //подключаемся к нему и делаем рукопожатие, согласно протоколу вебсокета if ($client = stream_socket_accept($this->server, -1)) { $address = explode(':', stream_socket_get_name($client, true)); if (isset($this->ips[$address[0]]) && $this->ips[$address[0]] > 5) {//блокируем более пяти соединий с одного ip @fclose($client); } else { @$this->ips[$address[0]]++; $this->clients[intval($client)] = $client; $this->handshakes[intval($client)] = array();//отмечаем, что нужно сделать рукопожатие } } //удаляем сервеный сокет из массива, чтобы не обработать его в этом цикле ещё раз unset($read[array_search($this->server, $read)]); } if (in_array($this->master, $read)) { //пришли данные от мастера $data = fread($this->master, 1000); $this->onSend($data);//вызываем пользовательский сценарий //удаляем мастера из массива, чтобы не обработать его в этом цикле ещё раз unset($read[array_search($this->master, $read)]); } if ($read) {//пришли данные от подключенных клиентов foreach ($read as $client) { if (isset($this->handshakes[intval($client)])) { if ($this->handshakes[intval($client)]) {//если уже было получено рукопожатие от клиента continue;//то до отправки ответа от сервера читать здесь пока ничего не надо } if (!$this->handshake($client)) { unset($this->clients[intval($client)]); unset($this->handshakes[intval($client)]); $address = explode(':', stream_socket_get_name($client, true)); if (isset($this->ips[$address[0]]) && $this->ips[$address[0]] > 0) { @$this->ips[$address[0]]--; } @fclose($client); } } else { $data = fread($client, 1000); if (!$data) { //соединение было закрыто unset($this->clients[intval($client)]); unset($this->handshakes[intval($client)]); $address = explode(':', stream_socket_get_name($client, true)); if (isset($this->ips[$address[0]]) && $this->ips[$address[0]] > 0) { @$this->ips[$address[0]]--; } @fclose($client); $this->onClose($client);//вызываем пользовательский сценарий continue; } $this->onMessage($client, $data);//вызываем пользовательский сценарий } } } if ($write) { foreach ($write as $client) { if (!$this->handshakes[intval($client)]) {//если ещё не было получено рукопожатие от клиента continue;//то отвечать ему рукопожатием ещё рано } $info = $this->handshake($client); $this->onOpen($client, $info);//вызываем пользовательский сценарий } } } } protected function handshake($client) { $key = $this->handshakes[intval($client)]; if (!$key) { //считываем загаловки из соединения $headers = fread($client, 10000); preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $headers, $match); if (empty($match[1])) { return false; } $key = $match[1]; $this->handshakes[intval($client)] = $key; } else { //отправляем заголовок согласно протоколу вебсокета $SecWebSocketAccept = base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))); $upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: websocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n"; fwrite($client, $upgrade); unset($this->handshakes[intval($client)]); } return $key; } protected function encode($payload, $type = 'text', $masked = false) { $frameHead = array(); $payloadLength = strlen($payload); switch ($type) { case 'text': // first byte indicates FIN, Text-Frame (10000001): $frameHead[0] = 129; break; case 'close': // first byte indicates FIN, Close Frame(10001000): $frameHead[0] = 136; break; case 'ping': // first byte indicates FIN, Ping frame (10001001): $frameHead[0] = 137; break; case 'pong': // first byte indicates FIN, Pong frame (10001010): $frameHead[0] = 138; break; } // set mask and payload length (using 1, 3 or 9 bytes) if ($payloadLength > 65535) { $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8); $frameHead[1] = ($masked === true) ? 255 : 127; for ($i = 0; $i < 8; $i++) { $frameHead[$i + 2] = bindec($payloadLengthBin[$i]); } // most significant bit MUST be 0 if ($frameHead[2] > 127) { return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)'); } } elseif ($payloadLength > 125) { $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8); $frameHead[1] = ($masked === true) ? 254 : 126; $frameHead[2] = bindec($payloadLengthBin[0]); $frameHead[3] = bindec($payloadLengthBin[1]); } else { $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength; } // convert frame-head to string: foreach (array_keys($frameHead) as $i) { $frameHead[$i] = chr($frameHead[$i]); } if ($masked === true) { // generate a random mask: $mask = array(); for ($i = 0; $i < 4; $i++) { $mask[$i] = chr(rand(0, 255)); } $frameHead = array_merge($frameHead, $mask); } $frame = implode('', $frameHead); // append payload to frame: for ($i = 0; $i < $payloadLength; $i++) { $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i]; } return $frame; } protected function decode($data) { $unmaskedPayload = ''; $decodedData = array(); // estimate frame type: $firstByteBinary = sprintf('%08b', ord($data[0])); $secondByteBinary = sprintf('%08b', ord($data[1])); $opcode = bindec(substr($firstByteBinary, 4, 4)); $isMasked = ($secondByteBinary[0] == '1') ? true : false; $payloadLength = ord($data[1]) & 127; // unmasked frame is received: if (!$isMasked) { return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)'); } switch ($opcode) { // text frame: case 1: $decodedData['type'] = 'text'; break; case 2: $decodedData['type'] = 'binary'; break; // connection close frame: case 8: $decodedData['type'] = 'close'; break; // ping frame: case 9: $decodedData['type'] = 'ping'; break; // pong frame: case 10: $decodedData['type'] = 'pong'; break; default: return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)'); } if ($payloadLength === 126) { $mask = substr($data, 4, 4); $payloadOffset = 8; $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset; } elseif ($payloadLength === 127) { $mask = substr($data, 10, 4); $payloadOffset = 14; $tmp = ''; for ($i = 0; $i < 8; $i++) { $tmp .= sprintf('%08b', ord($data[$i + 2])); } $dataLength = bindec($tmp) + $payloadOffset; unset($tmp); } else { $mask = substr($data, 2, 4); $payloadOffset = 6; $dataLength = $payloadLength + $payloadOffset; } /** * We have to check for large frames here. socket_recv cuts at 1024 bytes * so if websocket-frame is > 1024 bytes we have to wait until whole * data is transferd. */ if (strlen($data) < $dataLength) { return false; } if ($isMasked) { for ($i = $payloadOffset; $i < $dataLength; $i++) { $j = $i - $payloadOffset; if (isset($data[$i])) { $unmaskedPayload .= $data[$i] ^ $mask[$j % 4]; } } $decodedData['payload'] = $unmaskedPayload; } else { $payloadOffset = $payloadOffset - 4; $decodedData['payload'] = substr($data, $payloadOffset); } return $decodedData; } abstract protected function onOpen($client, $info); abstract protected function onClose($client); abstract protected function onMessage($client, $data); abstract protected function onSend($data); abstract protected function send($data); } //пример реализации чата class WebsocketHandler extends WebsocketWorker { protected function onOpen($client, $info) {//вызывается при соединении с новым клиентом } protected function onClose($client) {//вызывается при закрытии соединения клиентом } protected function onMessage($client, $data) {//вызывается при получении сообщения от клиента $data = $this->decode($data); if (!$data['payload']) { return; } if (!mb_check_encoding($data['payload'], 'utf-8')) { return; } //var_export($data); //шлем всем сообщение, о том, что пишет один из клиентов $message = 'пользователь #' . intval($client) . ' (' . $this->pid . '): ' . strip_tags($data['payload']); $this->send($message); $this->sendHelper($message); } protected function onSend($data) {//вызывается при получении сообщения от мастера $this->sendHelper($data); } protected function send($message) {//отправляем сообщение на мастер, чтобы он разослал его на все воркеры @fwrite($this->master, $message); } private function sendHelper($data) { $data = $this->encode($data); $write = $this->clients; if (stream_select($read, $write, $except, 0)) { foreach ($write as $client) { @fwrite($client, $data); } } } } $config = array( 'host' => '0.0.0.0', 'port' => 8000, 'workers' => 1, ); $WebsocketServer = new WebsocketServer($config); $WebsocketServer->start(); Update (лучшее из комментариев): на одно соединение уходит около 9кб памяти если при работе с открытыми сокетами использовать fgets(), то можно получить «зависание» (функция будет ожидать конца строки или таймаута), потому что по протоколу вебсокета сообщение не заканчивается переносом строки. используйте fread() при записи ответа в сокет функцией fwrite() (функция возвращает количество записанных байт) необходимо проверять, что в сокет были записаны все данные при ответе с сервера нужно проверять готовность клиента получать данные при помощи функции stream_socket_accept() если на сервере в сокет писать символы, отсутствующие в utf-8, то клиент будет разрывать соединение с ошибкой: WebSocket connection to 'ws://sharoid.ru:8000/' failed: Could not decode a text frame as UTF-8. при проверке, что клиент не прислал никаких данных и нужно закрывать сокет используйте !strlen($data), а не !$data перед сервером вебсокетов можно поставить nginx |
|||||||
Так же в этом разделе:
|
|||||||
|
|||||||
|