Розбираємося з основною термінологією та реалізуємо найпростіший приклад взаємодії з брокером повідомлень за 10 хвилин.
В результаті у нас буде дві програми: одна буде відправляти повідомлення, а інша їх отримувати та виводити в термінал.
Hello, Укрпошта
Якщо трохи спростити, то RabbitMQ можна уявити як поштову скриньку, поштове відділення, сортувальний центр та листоношу одночасно.
Передача. Як Укрпошта відповідає за доставку листів від відправника до одержувача, так і брокер забезпечує передачу повідомлень між продюсером (producer) та споживачем (consumer).
Зберігання. Листи тимчасово зберігаються в поштових відділеннях та сортувальних центрах, поки не будуть доставлені адресату. Схожим чином, брокер повідомлень використовує черги (queue) для буферизації повідомлень до їх обробки споживачем.
Адресація. Поштова служба визначає найкращий маршрут доставки на основі адреси одержувача. RabbitMQ використовує теми (topic) та точки обміну (exchange) для маршрутизації повідомлень.
Привіт, RabbitMQ
RabbitMQ — це брокер повідомлень: він виступає посередником між іншими компонентами і реалізує середовище для асинхронної комунікації.
Відходячи від теми пошти, проговоримо терміни, які нам будуть потрібні:
Producer (продюсер) відправляє повідомлення.
Queue (черга) - місце, де повідомлення зберігаються. З одною чергою може працювати безліч продюсерів та споживачів.
Consumer (споживач) отримує повідомлення.
Exchange (точка обміну) ми обговоримо іншим разом.
Продюсер та споживач - це застосунки, які пишуть розробники і вони не є частиною RabbitMQ.
Для чого брокер повідомлень взагалі може стати в нагоді:
Асинхронна обробка. Особливо корисно для веб-сервісів: важкі операції можна виконати поза циклом запит-відповідь.
Зниження зв'язності (coupling) та обмеження знань про систему. Відправнику повідомлень не потрібно нічого знати про отримувача і навпаки.
Паралельна обробка. Повідомлення можуть оброблятися великою кількістю отримувачів.
Запобігання втраті інформації. Брокер може використовувати механізм підтвердження як для доставки, так і для відправлення.
Зменшити навантаження на downstream сервіс. Відправник передає повідомлення брокеру, вони попадають в чергу, а споживач може отримувати їх в тій кількості і з тією швидкістю, яка йому підходить.
Вмикаємо таймер
Запуск RabbitMQ
Я вважаю, що найкращий інструмент для експериментів - Docker.
Порівняйте самі:
Встановити Erlang
Встановити RabbitMQ (можна скачати з Github)
Активувати плагін керування
З іншого боку, достатньо ввести в термінал:
docker run -it --rm -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
Результат однаковий, тільки в першому випадку, у вас в системі з'явиться багато лишнього. Принаймні, якщо не працюєте з Erlang.
Отож, після запуску, нас чекає сторінка входу в панель керування.
Важливо: під ім’ям guest
можна зайти лише в локально установлений RabbitMQ.
Підготовка PHP
RabbitMQ підтримує кілька протоколів, але ми зупинимося на AMQP 0-9-1. Для PHP цей протокол реалізований в пакеті php-amqplib/php-amqplib
.
Перед його встановленням, потрібно активувати розширення PHP для роботи з сокетами: php-sockets
.
Під Windows, потрібно розкоментувати в php.ini
таку стрічку:
extension=php_sockets.dll
Під Linux іншу:
extension=sockets.so
Нарешті, встановлюємо php-amqplib
з допомогою Composer:
composer require php-amqplib/php-amqplib
Якщо у вас встановилася версія 2.8
цього пакету, це означає, що php-sockets
не увімкнувся. Подальші приклади будуть для версії 3.6, тому спробуйте ще раз. Якщо будуть складнощі - пишіть в коментарях.
Реалізація продюсера
Створимо простенький скрипт. Він буде читати стрічки, які ми йому введемо в термінал і відправляти їх брокеру.
В файлі sender.php
підключаємо Composer Autoloader та імпортуємо кілька класів:
require_once __DIR__ . '/vendor/autoload.php';
// клас для роботи з підключеням
use PhpAmqpLib\Connection\AMQPStreamConnection;
// клас повідомлення
use PhpAmqpLib\Message\AMQPMessage;
І підключаємося до локального серверу:
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
Якщо ви, як і я, не любите писати те, чого не розумієте: тут $connection
являє собою TCP з'єднання з брокером, а канал $channel
— це віртуальне з'єднання AMQP всередині TCP.
Всю подальшу роботу будемо виконувати через об'єкт каналу.
Якщо сервер знаходиться на іншому комп'ютері, то треба вказати його IP.
Важливо: Для нашого експерименту не критично, але враховуючи цикл життя запиту в PHP, дуже рекомендується використовувати проксі, бо багато часу буде йти на підключення до брокера.
Так, наприклад, розробники AMQProxy пишуть, що різниця може бути більш ніж на порядок: 10мс при роботі з проксі та 500мс без нього. Майте на увазі.
Щоб відправити повідомлення нам потрібна черга.
Назвемо її my-queue
і оголосимо:
// нічого не повертає, в подальшому використовується назва
$channel->queue_declare('my-queue');
Ця дія є ідемпотентною, тому її можна виконувати скільки завгодно: черга буде створена лише у випадку, якщо її ще не існує.
Тепер основа скрипту. Тут ми чекаємо на нову стрічку від користувача, створюємо повідомлення (AMQPMessage
) та публікуємо його в чергу my-queue
:
echo 'Натисни "Enter" щоб відправити повідомлення, "Ctrl+C" щоб вийти' . PHP_EOL;
while (true) {
$input = readline("Повідомлення: ");
$message = new AMQPMessage($input);
$channel->basic_publish($message, routing_key: 'my-queue');
echo ' ✔️ Відправлено' . PHP_EOL;
}
В кінці, гарно було б закрити канал та з'єднання:
$channel->close();
$connection->close();
Але у нас цей код працювати не буде, бо вихід реалізовано через Ctrl+C
.
Реалізація споживача
Починаємо точно так само. Імпорт, з'єднання, канал:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
Як не дивно, а нам знову треба створити чергу:
$channel->queue_declare('my-queue'); // код такий самий як і у продюсера
Для чого це робити? Бо споживача може бути запущено першим і тоді він не зможе знайти чергу:
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: NOT_FOUND - no queue 'my-new-queue' in vhost '/'...
Наступним кроком, нам потрібно зареєструвати споживача:
$channel->basic_consume(
'my-queue', // черга, яку будемо слухати
no_ack: true, // відключаємо ручне підтвердження - про це іншим разом
// функція, що буде опрацьовувати отримані повідомлення
callback: function (AMQPMessage $message) {
echo '✉️ Отримано: ' . $message->body . PHP_EOL;
},
);
Тут важливо згадати, що повідомлення надходять асинхронно, тому потрібно ще створити цикл, який буде чекати на повідомлення.
На щастя, він уже реалізований в php-amqplib
, а нам залишається викликати метод consume()
:
echo 'Чекаю на повідомлення. Натисни "Ctrl+C" щоб вийти' . PHP_EOL;
try {
$channel->consume(); // ось тут ми чекаємо
} catch (Throwable $e) {
echo $e->getMessage() . PHP_EOL;
}
Зупиняємо таймер
Тепер треба перевірити, що все працює. Якщо не працює, то не страшно, бо:
Програмування — це не написання коду, це пошук причини, чому він не працює.
— Незнаю Хто
Запускаємо наш продюсер:
php .\sender.php
Потім споживач:
php .\receiver.php
І спілкуємося з собою.
Також, можна зайти в панель керування http://127.0.0.1:15672/ та походити по вкладках.
Вітаю, ми закінчили!
Пишіть в коментарях, скільки часу вам знадобилося 😉