Rabbitmq для чего нужен
RabbitMQ. Часть 1. Introduction. Erlang, AMQP
Добрый день, Хабр! Хочу поделиться учебником-справочником знаний, которые мне удалось собрать по RabbitMQ и сжать в короткие рекомендации и выводы.
Оглавление
Кратко про AMQP
AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.
Протокол AMQP вводит три понятия:
Протокол работает поверх TCP/IP.
Кратко про Erlang
Исходный код проекта находится в репозитории на GitHub. Архитектура RabbitMQ-server основана на Erlang и BEAM.
Кратко про RabbitMQ
Основная идея модели обмена сообщениями в RabbitMQ заключается в том, что producer (издатель) не отправляет сообщения непосредственно в очередь. На самом деле и довольно часто издатель даже не знает, будет ли сообщение вообще доставлено в какую-либо очередь.
Вместо этого издатель может отправлять сообщения только на обмен. С одной стороны, обмен получает сообщения от издателей, а с другой — отправляет их в очереди. Обмен должен точно знать, что делать с полученным сообщением. Должно ли оно быть добавлено в определенную очередь? Должно ли оно быть добавлено в несколько очередей? Или сообщение нужно игнорировать.
Кратко работу RabbitMQ можно описать следующим образом:
Подключение и каналы
Для такого обмена информацией между клиентом и сервером используются каналы. Каналы создаются в рамках определенного подключения. Каждый канал изолирован от других каналов. В синхронном случае не возможно выполнять следующую команду, пока не получен ответ.
Для того чтобы иметь возможность отправлять команды параллельно приходится открывать несколько каналов. Каждый канал создает отдельный Erlang процесс. Одно подключение может иметь множество каналов (multiplexing). Для каждого канала существуют некие структуры и объекты в памяти. Поэтому чем больше каналов имеется в рамках соединения, тем больше памяти использует RabbitMQ для управления таким соединением.
Простой пример создания подключения и канала при помощи RabbitMQ.Client:
Открывать новое соединение для каждой операции, настоятельно не рекомендуется, поскольку это приведет к большим затратам. Каналы также должны быть постоянными, но многие ошибки протокола приводят к закрытию канала, поэтому срок службы канала может быть короче, чем у соединения.
Где используется RabbitMQ?
В контексте микросервисов протокол AMQP и его реализацию в RabbitMQ часто используют для асинхронного взаимодействия между сервисами.
В контексте IIOT протокол AMQP и его реализацию в RabbitMQ используют для обмена данными между серверами (сервер-сервер). Также используют плагин MQTT Plugin RabbitMQ являющегося реализацией протокола MQTT для передачи данных между датчиком и сервером в низкоскоростных средах с высокой задержкой (полный перечень поддерживаемых протоколов перечислен на сайте проекта).
В следующей статье начнем разбираться подробнее с Exchanges.
RabbitMQ tutorial 1 — Hello World
RabbitMQ позволяет взаимодействовать различным программам при помощи протокола AMQP. RabbitMQ является отличным решением для построения SOA (сервис-ориентированной архитектуры) и распределением отложенных ресурсоемких задач.
Под катом перевод первого из шести уроков официального сайта. Примеры на python, но его знание вовсе не обязательно. Аналогичные примеру программы можно воспроизвести практически на любом популярном ЯП. [так выглядят комментарии переводчика, т.е. меня]
Вступление
RabbitMQ ‒ это брокер сообщений. Его основная цель ‒ принимать и отдавать сообщения. Его можно представлять себе, как почтовое отделение: когда Вы бросаете письмо в ящик, Вы можете быть уверены, что рано или поздно почтальон доставит его адресату [видимо, автор ни разу не имел дела с Почтой России]. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.
Наибольшее отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами ‒ RabbitMQ принимает, хранит и отдает бинарные данные ‒ сообщения.
В RabbitMQ, а также обмене сообщениями в целом, используется следующая терминология:
Поставщик, подписчик и брокер не обязаны находиться на одной физической машине, обычно они находятся на разных.
Hello World!
Первый пример не будет особо сложным ‒ давайте просто отправим сообщение, примем его и выведем на экран. Для этого нам потребуется две программы: одна будет отправлять сообщения, другая ‒ принимать и выводить их на экран.
Общая схема такова:
Поставщик отправляет сообщения в очередь с именем «hello», а подписчик получает сообщения из этой очереди.
Библиотека RabbitMQ
RabbitMQ использует протокол AMQP. Для использования RabbitMQ необходима библиотека, поддерживающая этот протокол. Такие библиотеки можно найти практически для каждого языка программирования. Python ‒ не исключение, для него есть несколько библиотек:
Отправка сообщений
Наша первая программа send.py будет просто отправлять одно сообщение в очередь.
Мы подключились к брокеру сообщений, находящемуся на локальном хосте. Для подключения к брокеру, находящемуся на другой машине, достаточно заменить «localhost» на IP адрес этой машины.
Перед отправкой сообщения мы должны убедиться, что очередь, получающая сообщение, существует. Если отправить сообщение в несуществующую очередь, RabbitMQ его проигнорирует. Давайте создадим очередь, в которую будет отправлено сообщение, назовем ее «hello»:
Теперь все готово для отправки сообщения. Наше первое сообщение будет содержать строку и будет отправлено в очередь с именем «hello».
Вообще, в RabbitMQ сообщения не отправляются непосредственно в очередь, они должны пройти через exchange (точка обмена). Но сейчас мы не будем заострять на этом внимание, точки обмена будут рассмотрены в третьем уроке. Сейчас достаточно знать, что точку обмена по-умолчанию можно определить, указав пустую строку. Это специальная точка обмена ‒ она позволяет определять, в какую именно очередь отправлено сообщение. Имя очереди должно быть определено в параметре routing_key:
Перед выходом из программы необходимо убедиться, что буфер был очищен и сообщение дошло до RabbitMQ. В этом можно быть уверенным, если использовать безопасное закрытие соединения с брокером.
Получение сообщений
Наша вторая программа receive.py будет получать сообщения из очереди и выводить их на экран.
Также как и в первой программе сначала необходимо подключиться к RabbitMQ. Для этого следует использовать тот же код, как и ранее. Следующий шаг, как и прежде ‒ убедиться, что очередь существует. Команда queue_declare не будет создавать новую очередь, если она уже существует, поэтому сколько бы раз не была вызвана эта команда, все-равно будет создана только одна очередь.
Вы можете задаться вопросом, почему мы объявляем очередь снова, ведь она уже была объявлена в первой программе. Это нужно, чтобы быть уверенным в существовании очереди, так будет, если сначала будет запущена программа send.py. Но мы не знаем, какая программа будет запущена раньше. В таких случаях лучше объявить очередь в обеих программах.
Мониторинг очередей
Если Вы хотите посмотреть, какие очереди существуют в RabbitMQ на данный момент, Вы можете сделать это с помощью команды rabbitmqctl (потребуются права суперпользователя):
(для Windows ‒ без sudo)
[в нашей компании используют более удобный скрипт мониторинга:]
[скрипт выводит и обновляет каждые 2 секунды таблицу со списком очередей: имя очереди; количество сообщений в обработке; количество сообщений готовых к обработке; общее количество сообщений; устойчивость очереди к перезагрузке сервиса; является ли временной очередью; количество подписчиков]
Получение сообщений из очереди более сложный процесс, чем отправка. Получение осуществляется при помощи подписки с использованием callback функции. При получении каждого сообщения библиотека Pika вызывает эту callback функцию. В нашем примере она будет выводить на экран текст сообщения.
Далее, нам нужно обозначить, что callback функция будет получать сообщения из очереди с именем «hello»:
Здесь необходимо быть уверенным в том, что очередь, на которую мы хотим подписаться, была объявлена. Мы сделали это ранее с помощью команды queue_declare.
Параметр no_ack будет рассмотрен позже [во втором уроке].
И, наконец, запуск бесконечного процесса, который ожидает сообщения из очереди и вызывает callback функцию, когда это необходимо.
Ну а теперь все вместе
Полный код receive.py:
Теперь мы можем попробовать запустить наши программы в терминале. Сначала отправим сообщение при помощи программы send.py:
Выполнение этой программы будет завершаться после отправки каждого сообщения. Теперь сообщение нужно получить:
Отлично! Мы отправили наше первое сообщение через RabbitMQ. Как Вы могли заметить, выполнение программы receive.py не завершилось. Она будет ожидать следующих сообщений, а остановить ее можно, нажав Ctrl+C.
Попробуйте запустить send.py снова в новом окне терминала.
Мы изучили, как отправлять и получать сообщения через именованные очереди. В следующем уроке мы создадим простую очередь задач [ресурсоемких].
UPD: библиотеку, работающую с RabbitMQ, для своего любимого ЯП Вы можете найти на официальном сайте тут.
Что такое RabbitMQ, зачем он нужен и как его использовать
Около полугода назад на одном проекте мы с напарником столкнулись с проблемой масштабирования, которая в тот момент внезапно ударила по серверу и весело его уронила. Количество задач, которые ставили пользователи, превысило барьеры вычислительных мощностей. Факторов, которые к этому привели, было несколько:
Проблема была решена — был арендован второй сервер, куда были перенесены соответствующие скрипты и с помощью небольшого API делегирована часть задач. Но в момент, когда мы искали решение и думали, что же лучше: по запросу передавать JSON-массив или что-то другое, нашли много интересной информации. В частности google выдавал что-то про «брокеры сообщений», «очереди», какого-то кролика… Причём тут кролик я тогда не понял, чтиво про брокеры — бросил, подумав, что это слишком сложно. Но через некоторое время появилась задача делегировать уже другие данные на ещё один сервер, и тут, на одном из форумов, мне уже явно посоветовали покурить в сторону злополучного кролика — RabbitMQ. К слову, задачу я решил, RabbitMQ оказался не таким и сложным (вернее — его конфигурация), а решение моей конкретной задачи заняло весьма немного кода. Так что же такое RabbitMQ?
RabbitMQ — это платформа позволяющая обмениваться сообщениями. Что может быть в сообщении — решать только тебе. Обмениваться можно как на одном сервере, так и с одного на другой. Это отличный способ масштабирования, так как с хорошо настроенным RabbitMQ мы можем просто подключать новые сервера, настроив на них нужное ПО и прописав конфигурацию, а RabbitMQ будет сам делегировать работу между всеми серверами. На официальном сайте есть мануалы по основным способам использования кролика — ознакомься. Я не буду описывать основные сущности, которые есть в RabbitMQ и на основе который строится протокол AMQP — это можно легко найти в сети.
Я сегодня рассмотрю, как интегрировать RabbitMQ в Symfony 4.1 на примере двух серверов — один будет отправлять сообщения, а другой получать. Какая актуальность данной статьи, если подобные маны уже есть? Ну по-первых, она на русском. Во-вторых, есть несколько ошибок, которые крайне сложно загуглить и решаются они методом научного тыка — вот их-то я и опишу.
У нас есть 2 сервера (хотя все примеры можно проделать и на одном, честно говоря) — один собирает задачи, другой их выполняет. Задачи могут быть любые — на твой вкус и цвет: от отправки email-сообщений, до обработки изображений. Можешь просто выводить данные на экран. Итак, на одном сервере у тебя идёт постановка задач, а на другом — обработка этих задач.
Сначала я показываю все действия на сервере, который собирает задачи.
Для начала — нужно установить бандл, который является обёрткой над библиотекой, реализующей протокол AMQP:
Ошибок быть не должно, а если и будут — внимательно читай вывод и ставь библиотеки, которые будут указаны после слов, похожих на слово require. Или пиши в комментариях — я с радостью помогу.
Дальше нужно установить rabbitmq-server. Да, у кролика есть свой полноценный сервер, который разворачивается на одном из серверов, если мы рассматриваем обмен между двумя серверами. Тут нужно подумать, что у тебя будет сервером, а что клиентом. Так как обрабатывает задачи (а значит и выбрасывает их в очередь) сервер, с которым мы сейчас работаем — будет логично установить rabbitmq-server на него Если я тут не прав — пиши в комментариях. Ставим:
И тут у тебя может появится первая проблема, об которую ты не слабо разобьёшь лоб:
rabbitmq/bin/rabbitmq-server-wait (code=exited, status=70)
Спустя почти час гугления я пришёл к выводу, что rabbitmq-server не запускается, собака такая, если в файле /etc/hostname у тебя прописано имя с точками. А мне поставили сервер, где в этом имени были точки… Точки эта штука распознавать не умеет.
Теперь добавляем пользователя и выдаём ему права, с помощью rabbitmqcli:
Первая строка — добавляем пользователя и устанавливаем ему пароль. Главное, измени username и password на свои данные. Далее ставим пользователю категорию «администратор». Затем, выдаём доступ к просмотру всего и вся. Я записал эти параметры в файл config/services.yaml:
Теперь нужно добавить сущность, с которой будет общаться rabbitmq-server — виртуальный хост, через который будет настраиваться конкретный обмен между конкретным постановщиком задач — producer (пишет в очередь) и потребителем задач — consumer (читает из очереди). Я не знаю почему, но в мануалах по интеграции Symfony и RabbitMQ этот пункт пропускают, а искать его самому и думать в чём ошибка — это дополнительное время. Добавляем:
username — это тот самый пользователь, созданный выше. my_project — имя хоста. Добавляем в параметры:
Идём дальше. А дальше нам нужно настроить конфиги. У тебя должен был появиться файл config/packages/old_sound_rabbit_mq.yaml — открывай его и добавляй:
Давай разберёмся. Я надеюсь, что ты ознакомился с примерами на официальном сайте и знаешь, что producer — публикует сообщения, exchange — определяет, в какую (если их несколько) очередь сообщение выбросить. port у rabbitmq по-умолчанию 5672 — таким и оставляем. Теперь нам нужно создать exchange, делаем это с помощью команды:
Всё успешно? Отлично, мы настроили rabbitmq на сервере, который будет писать в очередь. У кролика есть крутой интерфейс — зайди на ip сервера через порт 15672, увидишь нечто следующее:
Входи под данными, которые были созданы чуть выше и увидишь графики, метрики и прочую интересную штуку.
Теперь переходим на сервер, который будет читать сообщения. Тут всё гораздо проще. Сначала ставь бандл, затем открывай файл конфига и пиши:
Ты уже понял, что параметры нужно определить в config/services.yaml в секции параметры. Имя пользователя и пароль указывай те, который были созданы выше. ip_main_server — это ip сервера, с которым мы работали выше.
Далее настраивается consumer. Параметр exchange_options имеет значение, которые мы создавали выше.
Параметр queue_options — это имя очереди. Можешь задать любое, она будет создана автоматически.
callback — это имя сервиса, который будет вызывать класс, обрабатывающий каждое новое сообщение из очереди. Давай его определим. Иди в файл config/services.yaml и добавляй:
Как ты догадался, нужно создать этот класс. Создаётся он по определённому шаблону — его нужно унаследовать от класса Command, который нам любезно предоставляет RabbitMQ Bundle.
Просто выводим тело письма, которое пришло. Я тестировал на массивах, поэтому — print_r. Теперь запускаем consumer в режим ожидания сообщений:
И теперь с сервера, где мы устанавливали producer, отправим первое сообщение. Создаём простой контроллер:
Переходи на /check_data_test и наслаждайся — ты отправил и получил первое сообщение
Читайте также
Я в своей жизни ещё ни разу не встречал проекта, где бы всё было сделано по правилам проектирования архитектуры, с…
Периодически при работе с консолью Linux нужно пронаблюдать за изменением вывода какой-либо команды с промежутком в несколько секунд. Для этого…
Как регламентировать перекуры в течение рабочего дня? Можно ли разрешать опаздывать к началу рабочего дня? Можно ли чатится во время…
1 Comment
andre · 2021-06-21 at 16:00
Блог Makeomatic: разработка сайтов и мобильных приложений
RabbitMQ: Простая и эффективная очередь сообщений
что такое RabbitMQ и его применение
В этой статье рассмотрим, как работает RabbitMQ, а также как и для чего её можно использовать в проектах на Node.js
Очереди сообщений
Что такое очередь сообщений ( message queue )? Это некая структура данных, которая обеспечивает хранение и передачу двоичных данных ( blobs ) между различными участниками системы. Очереди сообщений практически всегда используются в крупных системах, благодаря важным преимуществам.
Учитывая огромную важность очередей для надёжных и гибких систем обработки данных, была даже разработана спецификация протокола — AMQP, на основе которой разрабатывается несколько приложений, выполняющих функцию очереди — так называемых «брокеров». Аналогия с биржевыми процессами будет прослеживаться и в дальнейшем. Мы рассмотрим брокер RabbitMQ, авторами которого и создан протокол AMQP.
Почему RabbitMQ?
Причин несколько, но одна из основных — реализация приложения на платформе Erlang/OTP, гарантирующая максимальную стабильность и масштабируемость очереди, как ключевого узла всей системы. Другая причина — полная открытость приложения, распространяющегося по лицензии Mozilla Public License и реализация открытого протокола AMQP, библиотеки для которого существуют во всех основных языках и платформах программирования. В том числе и для Node.js
Основные понятия
Брокер
Под брокером мы будем понимать сам сервер RabbitMQ. Брокер может быть один, брокеров может быть несколько, объединённых в общий кластер. Брокер занимается непосредственно передачей сообщений. Однако на внутреннем уровне происходит намного больше процессов, нежели просто передача байтиков по сети.
Очередь
Очередь — основной логический компонент брокера. Именно из очереди клиент ( consumer ) забирает сообщения. Другое дело, что очередь не единственный участник обмена.
Биржа
Биржа (exchange, иногда переводится как «обмен») играет важнейшую роль в направлении сообщений от отправителя ( producer ) к клиенту (consumer, он же потребитель). Дело в том, что именно благодаря бирже, поступающее от отправителя сообщение направляется в нужную очередь. Кроме того, у сообщения может присутствовать метка ( routingKey ) (ключ м
Это наиболее важная строчка, в которой мы сообщаем брокеру, что сообщение было принято, полностью обработано и его можно безопасно удалить из очереди. Если такое подтверждение брокеру не отправить, то сообщение никогда не будет удалено из очереди и постепенно брокер заполнит всю оперативную память сервера. Будьте внимательны — это одна из самых частых ошибок при работе с очередью.
Важно: отправляйте ack только когда сообщение действительно полностью обработано и его можно удалить. Это будет гарантировать две вещи:
Важно: если сообщение обработать невозможно по техническим или каким-то другим причинам у вас есть два варианта.
Обратите внимание: брокер по-умолчанию сам распределяет нагрузку между клиентами, вам ничего не нужно для этого делать. Один у вас клиент, или пятьдесят — брокеру всё равно.
Publish-Subscribe (он же Broadcast)
Никто не запрещает отправлять сообщения сразу всем клиентам, а не по алгоритму round-robin. Это позволяет использовать очередь в качестве сервера pubsub. Всё, что для этого нужно сделать — определить биржу типа fanout. Делается это в вызове assertExchange :
Как видно, тип биржи передаётся вторым параметром. Поменяйте код отправителя и клиентов (помните, что определения бирж и очередей должны совпадать), как показано выше и попробуйте запустить несколько клиентов. Посмотрите, как будут распределяться сообщения теперь.
Всего одно маленькое дополнение — и совершенно изменившийся алгоритм работы. Как видите, для того, чтобы менять поведение брокера, вовсе не нужно лезть в глубокие настройки сервера. Достаточно слегка поменять код.
Маршрутизация по шаблону
Метка должна содержать несколько слов, разделённых точкой. Например: «a.b» или «animals.feline.tiger». Должна присутствовать по крайней мере одна точка. Максимальный размер метки — 255 байт. Обратите внимание: не символов, байт. Если вы используете символы Unicode, то имейте это ввиду.
Существует два особых знака, которые используются в routingKey при привязке очереди к бирже по метке (и только тогда, но не при отправке!):
Позволит нам принимать все сообщения о животных из семейства кошачьих, не имеющих подвидов.
Ну а следующее сообщение будет получено клиентом, который добавил вышестоящую привязку:
Зато такое сообщение им принято не будет:
Remote Procedure Call
Иногда возникает потребность передать сообщение обработчику И дождаться ответа. Этот сценарий описывает систему «удалённого вызова процедур». Такая система тоже вполне может быть построена с помощью RabbitMQ. Посмотрим, как это сделать.
Клиент
На клиенте всё очень просто: в вызов publish добавляется специальная опция replyTo, значением которой является имя очереди, в которой клиент будет ожидать ответ. Обратите внимание, что в данном случае клиент обращается к серверу именно через publish, поскольку он хочет вызвать удалённую процедуру, находящуюся на сервере. В данном сценарии отправителем будет являться клиент, а потребителем — сервер. Затем их роли поменяются местами, когда сервер отправит клиенту ответ.
Подразумевается, что очередь “api-reply” существует. Однако здесь следует заметить вот что: поскольку клиент ожидает ответ на конкретный вызов, то очередь, в которую придёт ответ должна быть уникальна. Для этой ситуации предусмотрена опция exclusive: true в вызове assertQueue — она гарантирует, что данная очередь будет доступна исключительно вызывавшему assertQueue клиенту и видна только в пределах канала связи. Мы могли бы создавать такую эксклюзивную очередь для каждого отдельного вызова RPC. Но это было бы крайне неэффективно (зато очень просто в реализации). Более выгодным вариантом является создание одной очереди на клиента
маршрутизации), которая дополнительно повлияет на решение брокера о том, в какую очередь сообщение будет отправлено.
Обратите внимание: очередь вторична по отношению к бирже. Именно биржа определяет, куда пойдёт сообщение, в какую очередь. Клиенты же могут принимать сообщения только из очереди, поэтому если вы не хотите разбираться с кучей проблем и передать всю маршрутизацию сообщений брокеру — имейте следующее в виду: если вы хотите отделить одни сообщения от других, их нужно разместить в разных очередях.
Другими словами, сообщения в одной очереди должны быть одинаковы по структуре, чтобы вы могли корректно и без усилий распределять их по системе. Рассматривайте очередь как набор элементов одинакового типа.
Варианты работы
Прямая передача
В этом варианте в самом простом случае у нас один клиент и один отправитель. Отправитель шлёт сообщение в очередь, клиент слушает очередь, достаёт из неё сообщения и обрабатывает их. Рассмотрим как это работает на следующем примере.
Библиотека вносит ещё один элемент в работу с очередью: канал. Однако это не более чем просто канал связи между брокером и общающимся с ним компонентом системы. Не следует рассматривать его как часть брокера или очереди сообщений.
Связь с брокером и создание канала
Рассмотрим по порядку, что происходит после установления связи с брокером и создания канала.
Сто сообщений мы передаём для демонстрации масштабирования системы исключительно средствами брокера, что очень просто и безболезненно.
Клиент для тестирования отправителя
Рассмотрим клиент, который нам нужен для тестирования отправителя. Запустим два или даже три таких клиента, после чего запустим отправителя, и убедимся, что все сообщения были распределены между клиентами максимально честно, по алгоритму round-robin.
Обратите внимание: насколько код клиента похож на код отправителя. Отличается лишь то, что вместо отправки сообщений, мы принимаем их. Как было сказано выше, принимать сообщения можно лишь из очереди (так же как отправлять — только на биржу).
Общий алгоритм работы
Аналогично можно отправлять сообщения и с клиента в очередь rpc :