MyTetra Share
Делитесь знаниями!
Делаем вебсокеты на PHP с нуля. Часть 2. IPC
Время создания: 07.05.2018 22:33
Автор: https://habr.com/users/morozovsk/
Текстовые метки: websocket
Раздел: WebSocket
Запись: Velonski/mytetra-database/master/base/1525714382dsmovx4k7a/text.html на raw.githubusercontent.com

После написания моей предыдущей статьи Делаем вебсокеты на PHP с нуля я понял, что у сообщества есть некоторый интерес к поднятой мною теме.


В прошлой статье я обещал, что опишу:

запуск нескольких процессов для обработки соединений

межпроцессное взаимодействие

разделение процессов мастер-воркер

проксирование вебсокетов с помощью nginx

запуск из консоли

интеграция с вашим фреймворком на примере yii

демонстрация



И, как обычно, — получившийся код и ссылка на демонстрационный чат в конце статьи.


Запуск нескольких процессов для обработки соединений


Для работы простого сервера вебсокетов достаточно одного процесса, но чтобы увеличить количество одновременных соединений (и обойти ограничение 1024 одновременных соединения), а также для использования ресурсов всего процессора (а не только одного ядра), необходимо, чтобы сервер вебсокетов использовал несколько процессов (оптимально — количество процессов = количество ядер процессора).


Для запуска нескольких процессов мы будем использовать функцию pcntl_fork(). Она создаёт новый процесс (дочерний), который является практически полной копией процесса-родителя, выполняющего этот вызов.

После вызова pcntl_fork() алгоритм разветвляется: в случае успешного выполнения функции pcntl_fork() она возвращает PID дочернего процесса родительскому, а NULL дочернему. Если создание форка закончилось неудачей, функция pcntl_fork() возвращает значение −1).


$pid = pcntl_fork(); //делаем форк


//далее весь код будет выполняться в обоих процессах


if ($pid == -1) {

// Не удалось создать дочерний процесс

} elseif ($pid) {

// Этот код выполнится родительским процессом

} else {

// А этот код выполнится дочерним процессом, его PID можно узнать с помощью функции getmypid()

}



Про отличие родительского процесса от дочернего можно почитать на википедии.


Мы можем в цикле создавать столько дочерних процессов, сколько нам необходимо:


$childs = array();


for ($i=0; $i<4; $i++) {

$pid = pcntl_fork(); //создаём форк


if ($pid == -1) {

die("error: pcntl_fork");

} elseif ($pid) { //родительский процесс

$childs[] = $pid; //заполняем массив дочерними PID, они нам ещё пригодятся :)

} else { //дочерний процесс

break; //выходим из цикла, чтобы дочерние процессы создавались только из родителя

}

}



Межпроцессное взаимодействие


Для взаимодействия между родительским и дочерним процессом мы будем использовать сокеты, а именно связанные сокеты:

Функция stream_socket_pair() создаёт пару связанных неразличимых потоковых сокетов. Таким образом мы можем писать в один сокет, а считывать данные из второго.


$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //получаем массив из связанных сокетов


fwrite($pair[0], 'тест'); //пишем в первый сокет

fread($pair[1], mb_strlen('тест')); //читаем из второго



Теперь совмещаем этот код с форками и получаем:


$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //создаём связанные сокеты


$pid = pcntl_fork(); //делаем форк


//далее весь код будет выполняться в обоих процессах


if ($pid == -1) {

die("error: pcntl_fork");

} elseif ($pid) { //родительский процесс

fclose($pair[0]); //закрываем один из сокетов в родителе

$child = $pair[1]; //второй будем использовать для связи с потомком

} else { //дочерний процесс

fclose($pair[1]); //закрываем второй из сокетов в потомке

$parent = $pair[0]; //первый будем использовать для связи с родителем

}



Итоговый код для создания множества дочерних процессов:


$parent = null;

$childs = array();


for ($i=0; $i<5; $i++) {

$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //создаём связанные сокеты


$pid = pcntl_fork(); //создаём форк


if ($pid == -1) {

die("error: pcntl_fork");

} elseif ($pid) { //родительский процесс

fclose($pair[0]); //закрываем один из сокетов в родителе

$childs[] = $pair[1]; //второй будем использовать для связи с потомком

} else { //дочерний процесс

fclose($pair[1]); //закрываем второй из сокетов в потомке

$parent = $pair[0]; //первый будем использовать для связи с родителем

break; //выходим из цикла, чтобы дочерние процессы создавались только из родителя

}

}


В результате работы этого кода в родителе массив $childs будет содержать в себе все сокеты для связи с потомками, а потомки для связи с родителем будут использовать $parent.


Разделение процессов на мастера и воркеров


Так как дочерние процессы в нашей реализации не связаны друг с другом напрямую и могут взаимодействовать только через родителя, то целесообразно разделение обязанностей между родителем и потомками:

родитель будет отвечать за взаимодействие между дочерними процессами (будет мастером)

дочерние процессы будут выполнять всю работу (будут воркерами)


Также воркер у нас будет заниматься пересылкой сообщений из скриптов со страниц сайта или из крона. Для этого мы создадим дополнительный сокет, и добавим его в массив, прослушиваемых сокетов. Например, можно создать unix-сокет:

$service = stream_socket_server('unix:///tmp/websocket.sock', $errorNumber, $errorString);



Проксирование вебсокетов с помощью nginx


Nginx поддерживает проксирование вебсокетов начиная с версии 1.3.13. Благодаря nginx можно обрабатывать соединения к серверу вебсокетов на том же порту, что и сайт, а также ограничить количество открытых вебсокетов с одного ip и другие полюбившиеся вам плюшки.


Пример nginx-конфига, который это позволяет:

limit_conn_zone $binary_remote_addr zone=perip:10m;


server {

listen 5.135.163.218:80;

server_name sharoid.ru;


location / {

limit_conn perip 5; #делаем ограничение 5 вебсокетов на 1 ip

proxy_pass http://127.0.0.1:8000;

proxy_http_version 1.1;

proxy_set_header Upgrade $http_upgrade;

proxy_set_header Connection "upgrade";

proxy_read_timeout 3600s; #увеличиваем таймаут для вебсокетов

}

}



Запуск из консоли


Выполняем команду php websocket.php или ./websocket.php (предварительно дав права на выполнение)

Если использовать nohup, например, nohup ./websocket.php &, то скрипт продолжит работать после закрытия консоли.


По-умолчанию есть два ограничения количества соединений на один процесс.

Первое — на уровне операционной системы: в одном процессе нельзя открыть более чем 1024 соединения. Чтобы обрабатывать больше одновременных соединений, выполните команду: ulimit -n 65535, а если у пользователя недостаточно привилегий, то sudo sh -c "ulimit -n 65535 && exec su $LOGNAME". Текущее значение можно посмотреть используя команду ulimit -n

Второе — у функции stream_select(): она не принимает больше чем 1024 соединения. Здесь всё сложнее — нужно перекомпилировать php c увеличенным FD_SETSIZE


Как я уже писал, эти ограничения можно обойти, используя дочерние процессы (воркеры).


Интеграция с вашим фреймворком на примере yii


Так как наш мастер прослушивает дополнительный сокет для связи с нашими скриптами (в примере выше был unix:///tmp/websocket.sock), мы можем в любом месте нашего сайта или в кроне соединиться с этим сокетом и отправить сообщение, которое мастер разошлёт всем воркерам, а они, в свою очередь, все клиентам:

$service = stream_socket_client ('unix:///tmp/websocket.sock', $errno, $errstr);

fwrite($service, 'всем привет');


С использованием компонента yii это будет выглядеть вот так:

Yii::app()->websocket->send('всем привет');


Подробнее для yii

Скачиваем экстеншн, кладём его в папку extensions/websocket

В папку components кладём Websocket.php, WebsocketMasterHandler.php и WebsocketWorkerHandler.php из папки sample/yii.

В папку commands кладём из WebsocketCommand.php из папки sample/yii.

В конфиги main.php и console.php вставляем в секцию components:

'websocket' => array(

'class' => 'Websocket',

//'websocket' => 'tcp://127.0.0.1:8000',

//'localsocket' => 'tcp://127.0.0.1:8001',// unix:///tmp/mysock

//'workers' => 1

),


В конфиг console.php также вставляем в секцию import:

'ext.websocket.*'




Демонстрация


Демонстрационный чат 2.0 (добавлен список пользователей, добавлено ограничение: 1 сообщение в секунду с одного IP)

В нём были использованы описанные выше функции, а также исправлены недостатки, выявленные после публикации предыдущей статьи.

Демонстрационный чат 1.0 (без списка пользователей, без ограничений)


Все исходники я оформил в виде библиотеки и выложил на github


Update: Если сообществу интересна эта тема, то следующая статья будет про то как сделать простую игру, в которой все участники будут находиться на одном игровом поле и взаимодействовать друг с другом в реальном времени (демка уже почти готова).

Так же в этом разделе:
 
MyTetra Share v.0.67
Яндекс индекс цитирования