Rabbitmq что это такое
Блог Makeomatic: разработка сайтов и мобильных приложений
RabbitMQ: Простая и эффективная очередь сообщений
что такое RabbitMQ и его применение
В этой статье рассмотрим, как работает RabbitMQ, а также как и для чего её можно использовать в проектах на Node.js
Очереди сообщений
Что такое очередь сообщений ( message queue )? Это некая структура данных, которая обеспечивает хранение и передачу двоичных данных ( blobs ) между различными участниками системы. Очереди сообщений практически всегда используются в крупных системах, благодаря важным преимуществам.
Учитывая огромную важность очередей для надёжных и гибких систем обработки данных, была даже разработана спецификация протокола — AMQP, на основе которой разрабатывается несколько приложений, выполняющих функцию очереди — так называемых «брокеров». Аналогия с биржевыми процессами будет прослеживаться и в дальнейшем. Мы рассмотрим брокер RabbitMQ, авторами которого и создан протокол AMQP.
Почему RabbitMQ?
Причин несколько, но одна из основных — реализация приложения на платформе Erlang/OTP, гарантирующая максимальную стабильность и масштабируемость очереди, как ключевого узла всей системы. Другая причина — полная открытость приложения, распространяющегося по лицензии Mozilla Public License и реализация открытого протокола AMQP, библиотеки для которого существуют во всех основных языках и платформах программирования. В том числе и для Node.js
Основные понятия
Брокер
Под брокером мы будем понимать сам сервер RabbitMQ. Брокер может быть один, брокеров может быть несколько, объединённых в общий кластер. Брокер занимается непосредственно передачей сообщений. Однако на внутреннем уровне происходит намного больше процессов, нежели просто передача байтиков по сети.
Очередь
Очередь — основной логический компонент брокера. Именно из очереди клиент ( consumer ) забирает сообщения. Другое дело, что очередь не единственный участник обмена.
Биржа
Биржа (exchange, иногда переводится как «обмен») играет важнейшую роль в направлении сообщений от отправителя ( producer ) к клиенту (consumer, он же потребитель). Дело в том, что именно благодаря бирже, поступающее от отправителя сообщение направляется в нужную очередь. Кроме того, у сообщения может присутствовать метка ( routingKey ) (ключ м
Это наиболее важная строчка, в которой мы сообщаем брокеру, что сообщение было принято, полностью обработано и его можно безопасно удалить из очереди. Если такое подтверждение брокеру не отправить, то сообщение никогда не будет удалено из очереди и постепенно брокер заполнит всю оперативную память сервера. Будьте внимательны — это одна из самых частых ошибок при работе с очередью.
Важно: отправляйте ack только когда сообщение действительно полностью обработано и его можно удалить. Это будет гарантировать две вещи:
Важно: если сообщение обработать невозможно по техническим или каким-то другим причинам у вас есть два варианта.
Обратите внимание: брокер по-умолчанию сам распределяет нагрузку между клиентами, вам ничего не нужно для этого делать. Один у вас клиент, или пятьдесят — брокеру всё равно.
Publish-Subscribe (он же Broadcast)
Никто не запрещает отправлять сообщения сразу всем клиентам, а не по алгоритму round-robin. Это позволяет использовать очередь в качестве сервера pubsub. Всё, что для этого нужно сделать — определить биржу типа fanout. Делается это в вызове assertExchange :
Как видно, тип биржи передаётся вторым параметром. Поменяйте код отправителя и клиентов (помните, что определения бирж и очередей должны совпадать), как показано выше и попробуйте запустить несколько клиентов. Посмотрите, как будут распределяться сообщения теперь.
Всего одно маленькое дополнение — и совершенно изменившийся алгоритм работы. Как видите, для того, чтобы менять поведение брокера, вовсе не нужно лезть в глубокие настройки сервера. Достаточно слегка поменять код.
Маршрутизация по шаблону
Метка должна содержать несколько слов, разделённых точкой. Например: «a.b» или «animals.feline.tiger». Должна присутствовать по крайней мере одна точка. Максимальный размер метки — 255 байт. Обратите внимание: не символов, байт. Если вы используете символы Unicode, то имейте это ввиду.
Существует два особых знака, которые используются в routingKey при привязке очереди к бирже по метке (и только тогда, но не при отправке!):
Позволит нам принимать все сообщения о животных из семейства кошачьих, не имеющих подвидов.
Ну а следующее сообщение будет получено клиентом, который добавил вышестоящую привязку:
Зато такое сообщение им принято не будет:
Remote Procedure Call
Иногда возникает потребность передать сообщение обработчику И дождаться ответа. Этот сценарий описывает систему «удалённого вызова процедур». Такая система тоже вполне может быть построена с помощью RabbitMQ. Посмотрим, как это сделать.
Клиент
На клиенте всё очень просто: в вызов publish добавляется специальная опция replyTo, значением которой является имя очереди, в которой клиент будет ожидать ответ. Обратите внимание, что в данном случае клиент обращается к серверу именно через publish, поскольку он хочет вызвать удалённую процедуру, находящуюся на сервере. В данном сценарии отправителем будет являться клиент, а потребителем — сервер. Затем их роли поменяются местами, когда сервер отправит клиенту ответ.
Подразумевается, что очередь “api-reply” существует. Однако здесь следует заметить вот что: поскольку клиент ожидает ответ на конкретный вызов, то очередь, в которую придёт ответ должна быть уникальна. Для этой ситуации предусмотрена опция exclusive: true в вызове assertQueue — она гарантирует, что данная очередь будет доступна исключительно вызывавшему assertQueue клиенту и видна только в пределах канала связи. Мы могли бы создавать такую эксклюзивную очередь для каждого отдельного вызова RPC. Но это было бы крайне неэффективно (зато очень просто в реализации). Более выгодным вариантом является создание одной очереди на клиента
маршрутизации), которая дополнительно повлияет на решение брокера о том, в какую очередь сообщение будет отправлено.
Обратите внимание: очередь вторична по отношению к бирже. Именно биржа определяет, куда пойдёт сообщение, в какую очередь. Клиенты же могут принимать сообщения только из очереди, поэтому если вы не хотите разбираться с кучей проблем и передать всю маршрутизацию сообщений брокеру — имейте следующее в виду: если вы хотите отделить одни сообщения от других, их нужно разместить в разных очередях.
Другими словами, сообщения в одной очереди должны быть одинаковы по структуре, чтобы вы могли корректно и без усилий распределять их по системе. Рассматривайте очередь как набор элементов одинакового типа.
Варианты работы
Прямая передача
В этом варианте в самом простом случае у нас один клиент и один отправитель. Отправитель шлёт сообщение в очередь, клиент слушает очередь, достаёт из неё сообщения и обрабатывает их. Рассмотрим как это работает на следующем примере.
Библиотека вносит ещё один элемент в работу с очередью: канал. Однако это не более чем просто канал связи между брокером и общающимся с ним компонентом системы. Не следует рассматривать его как часть брокера или очереди сообщений.
Связь с брокером и создание канала
Рассмотрим по порядку, что происходит после установления связи с брокером и создания канала.
Сто сообщений мы передаём для демонстрации масштабирования системы исключительно средствами брокера, что очень просто и безболезненно.
Клиент для тестирования отправителя
Рассмотрим клиент, который нам нужен для тестирования отправителя. Запустим два или даже три таких клиента, после чего запустим отправителя, и убедимся, что все сообщения были распределены между клиентами максимально честно, по алгоритму round-robin.
Обратите внимание: насколько код клиента похож на код отправителя. Отличается лишь то, что вместо отправки сообщений, мы принимаем их. Как было сказано выше, принимать сообщения можно лишь из очереди (так же как отправлять — только на биржу).
Общий алгоритм работы
Аналогично можно отправлять сообщения и с клиента в очередь rpc :
Ajaxblog
RabbitMQ для начинающих
Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи, которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей – избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь, а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед, скажу, что возможен режим работы очередей, когда при наличии нескольких копий обработчика, следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач.
В данном разделе рассматривается работа с очередями, использующими сервер сообщений RabbitMQ. Сервер RabbitMQ по сути является менеджером очередей, который имеет следующие преимущества:
В туториалах будут приведены примеры для всех вышеперечисленных вариантов. За основу взяты туториалы с официального сайта, дополнены и реализованы на PHP для RabbitMQ.
RabbitMQ испозует протокол AMQP. Чтобы использовть RabbitMQ необходимо поставить клиентскую и серверную части.
Установка сервера
Для установки расширения AMQP для PHP необходимо сначала установить RabbitMQ Server
Добавим следующию строку в файл /etc/apt/sources.list
Установка клиента
Выбираем нужную библиотеку и устанавливаем http://www.rabbitmq.com/devtools.html. Наиболее популярны php-amqplib и PECL AMQP library
Базовые понятия
В RabbitMQ используются следующий обозначения. Продюсер – программа, которая посылает сообщения. Будем обозначать его так
Брокер(очередь) – собственно просто буфер в памяти без каких-либо ограничений на количество хранимых сообщений. В одну и ту же очередь могут отсылать сообщения несколько продюсеров, так же как несколько консьюмеров могут пытаться получить сообщения из одной и той же очереди. Очередь будет обозначена так(сверху указано имя очереди)
Консьюмер(получатель) – программа, которая принимает сообщения из очереди. Будем обозначать его так
Здесь важно отметить, что продюсер, консьюмер и брокер могут быть расположены на различных машинах, более того, в большинстве случаев это именно так.
Первый скрипт работы с очередью, своего рода “Hello world”, будет отсылать текстовое сообщение с клиента, принимать его на сервере и выводить на экран.
Т.е. схема работы следующая: Первое, что надо сделать, это установить соединение с сервером RabbitMQ. Соединение устанавливается командами
это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров, то его можно опустить. И, напротив, если, к примеру, нужно подключиться к другой машине, в параметре host необходимо указать ее имя или ip адрес.
Используя коннект можно получить объект для канала
На основе полученного канала создаем обменник
и, собственно, саму очередь
Когда обменник и очередь готовы, их можно связать по ключу
После того как сообщение отослано, коннект можно разорвать.
Получатель также должен выполнить ту же последовательность – приконнектиться к серверу сообщений; – создать канал; – объявить обменник; – объявить очередь; – связать очередь с обменником по ключу Последние два действия, как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь
Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK, которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения, вернуть повторно его в очередь нельзя.
Таким образом, получаем два скрипта
Распределенные очереди
Основная идея заключается в следующем. Допустим нужно обработать видео файл, чтобы получить на выходе три сконвертированных файла в различные форматы, информацию о метаданных и создать иконки для этого видеофайла. Т.е. получаем 5 задач, три из которых довольно тяжеловесные(конвертация), одна легкая(получение метаданных) и одна средняя(создание иконок). При этом все эти задачи являются независимыми друг от друга. Таким образом, можно выполнять их одновременно, т.е. распараллелить обработку очереди на уровне сообщений(пункт 2). Для этого при объявлении обменника необходимо установить ему тип AMQP_EX_TYPE_FANOUT. Тогда все сообщения, посылаемые в указанный обменник, независимо от имени очереди и ключа роутера, будут прослушиваться всеми запущенными копиями консьюмера. Т.е. каждое следующее сообщение будет отсылаться на следующий свободный консьюмер. В нашем случае их должно быть пять. Такой способ обработки получил название round-robin dispathing. Обратите внимание, что при отправке продюсером и при получении консьюмером используется одна и та же очередь.
Оповещение (acknowledgment)
Некоторые задачи могут выполняться довольно долго. И неизвестно, что может произойти с сервером в этот момент: сервер может перезагрузиться, либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки, случившейся во время обработки, не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает, что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений, то необходимо вызвать метод nack() с флагом AMQP_REQUEUE, который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана.
Распростаненная ошибка – при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте, все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений, что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl
Жизнеспособность сообщений (durability)
В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать, очередь должна быть создана с флагом AMQP_DURABLE.
Если очередь ‘hello’ уже была объявлена, то данный код вызовет ошибку, поскольку один раз объявленную очередь нельзя объявить повторно с другими параметрами. Из этой ситуации есть два выхода, либо обнулить все очереди как сказано здесь, либо создать новую очередь с неиспользуемым именем. Посмотреть список очередей можно спопособом упомянутым в предыдущем параграфе. Установка флага AMQP_DURABLE не гарантирует стопроцентную сохранность сообщений в очереди. Несмотря на то, что таким спопосбом мы указываем RabbitMQ сохранять сообщения на диске, существует мертвая зона после получения соощения, когда оно уже в памяти, но еще не сохранено на диске. В этот момент, в случае не предвиденной ситуации, оно может быть утеряно из памяти. Для нашего простого примера таких гарантий достаточно, но если необходимо добиться высоких гарантий получения сообщения, то следует использовать транзакции.
Все вместе
Для примера распределения сообщений между очередями нам понадобится функция, имитирующая загруженность системы. Для этого мы используем обычный таймер
Полный код продюсера (send.php) будет выглядеть так
Обратите внимание, что очереди создаются с абсолютно идентичными параметрами. Как уже было сказано, повторное создание очереди с иными параметрами вызовет исключение. Также стоит отметить, что если вы уверены, что консьюмер будет запускаться первым, то создание очереди в продюсере не обязательно.
Теперь, если запустить несколько копий консьюмера, то можно будет видеть как между ними распределяются сообщения. Предположим, что мы отправили 8 сообщений в очередь, т.е. запустили скрипт send.php 8 раз. После этого запускаем в двух разных терминалах по консьюмеру
Вывод в первом терминале
Вывод во втором терминале
Как видно сообщения распределились по мере нагрузки консьюмера.
Рассылка публикаций
В предыдущем уроке мы распределяли сообщения между всеми консьюмерами. В данном уроке, наоборот, будем отсылать все сообщения из очереди на все консьюмеры. Такой шаблон известен как “публичная рассылка”(publish subscribe). Такое поведение может быть полезно, к примеру, при создании логирования с одновременным выводом сообщения в терминал. Т.е. один консьюмер получает сообщение и сохраняет его на диска, в то время как другой выводит это сообщение на экране.
В предыдущих разделах мы не заостряли внимание на обменнике(exchanger). На самом деле продюсер никогда не отправляет сообщения непосредственно в очередь. Он размещает их в обменнике. Собственно говоря, продюсер и не знает было ли сообщение доставлено в очередь или нет. Обменник представляет собой простую вещь – он получает сообщения от продюсера и отправляет(публикует) их в очередь. При этом обменник четко знает по какому алгоритму он работает:
В нашем примере будем использовть тип обменника fanout.
Для этой цели продюсер не создает именованную очередь. Консьюмер же, в свою очередь, создает анонимную очередь, в которую принимает сообщения продюсера. При таком подходе каждый консьюмер будет принимать все сообщения продюсера.
Анонимные очереди
В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых, нам нужны очереди с различными именами. Во-вторых, созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди, при ее создании нужно задать флаги AMQP_IFUNUSED, AMQP_AUTODELETE.
Связывание (bindings)
Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику, что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding)
Здесь второй параметр – ключ, по которому связывается обменник и очередь. В данном случае он может быть любой строкой, поскольку его значение игнорируется в случае, если обменник имеет тип fanout.
Все вместе
Поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Селективная рассылка
В предыдущем уроке была рассмотрена возможность отсылки сообщений нескольким получателям.В данном уроке мы рассмотрим как отсылать сообщения в четко определенные очереди. Такая возможность может понадобиться, к примеру, если мы не хотим сохранять все сообщения на диске, а только критический. В то время как на экран будут выводиться все сообщения.
Связывание (bindings)
Связываение уже упоминалось в предыдущем уроке
Повторимся, оно нужно, чтобы сказать обменнику, что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр – ключ(routingKey), по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить, что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру, если нужно связать обменник и очередь по ключу ‘failure_messages’
Прямое связывание (точка-точка)
В предыдущем уроке система логирования выполняла широковещательную рассылку всем консьюмерам. Теперь мы хотим расширить это поведение путем добавления фильтра сообщений по их важности. К примеру, критические ошибки писать на диск, а предупреждения только выводить на экран с целью экономии дискового пространтсва. Ранее мы использовали обменник с типом fanout, который не позволяет это сделать. Сейчас мы используем другой тип обменника – direct, который отправляет сообщения только тем очередям, routingKey которых совпадает с routingKey сообщения. Это поведение проиллюстрировано на изображении
На изображении можно видеть обменник X с типом direct, который связан с очередью Q1 по ключу failure, и с очередью Q2 по ключам notice и warning. В данном случае все сообщения с ключем failure будут отсылаться только в очередь Q1, а все сообщения с ключами notice и warning будут отсылаться в очередь Q2. Сообщения, ключи которые не совпадают с выше указанными, будут игнорироваться всеми очередями.
Множественная связь
Вполне возможно несколько очередей связать с обменником по одному и тому же ключу. Т.е. для нашего примера мы вполне можем установить связь по ключу notice между обменником и очередью Q1 и между обменником и очередью Q2. В таком случае сообщения с ключем notice будут отсылаться на обе очереди, т.е. получаем поведение аналогичное обменнику с типом fanout.
Отправка сообщений
Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.
После чего возможна публикация сообщений по ключу
Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу
Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Рассылка по шаблону
В предыдущем уроке мы улучшили нашу систему логирования путем использования обменников с типом direct, создав возможность получать сообщения выборочно. Следующим этапом будет создание системы, позволяющей логировать по множеству критериев. Допустим мы хотим разделить обработаку логирования основываясь не только на важности сообщений, но и по устройствам, вызвавшим эту ошибку. Это предоставило бы нам большую гибкость – к примеру, можно было бы выделить обработку логов для критических ошибок, инициированных кроном, и отдельно выделить обработку логов всех сообщений от ядра системы. Для имплементации такой возможности нам неоходимо нечто большее, чем прямая рассылка сообщений (рассылка по методу точка-точка). Связывание по шаблону
Для выполнения связи по шалбону обменник должен иметь тип topic, который определяется константой AMQP_EX_TYPE_TOPIC. Ключи routingKey составляются из слова, следующих через точку, например, “logs.devices.kernel.notice”, “logs.devices.cron”. Максимальная длина такого ключа может составлять 255 символов. Логика доставки сообщений по ключу схожа с логикой для обменников с типом direct – сообщения с определенным ключем будут доставлены в очереди с соответствующим ключем. Но есть одна большая разница. Ключи, используемые для связи по шаблону, могут содержать два специальных символа:
Например, имеем следующие связи
*.orange.*
*.*.rabbit
lazy.#
Первое слово описывает скорость, второе – цвет и третье – вид животного, т.е. [speed][color][species]. Мы создали три связи: очередь Q1 связали по ключу “.orange.” и очередь Q2 – по ключам “..rabbit” и “lazy.#”. Таким образом, можно сказать, что очередь Q1 рассматривает всех оранжевых животных, а очередь Q2 – всех зайцев и всех медленных животных.
Рассмотрим несколько примеров:
Обменник с типом topic может повторять поведение обменника с типом fanout, если с ним связать очередь по ключу “#”. Если в ключе не испльзовать специальных символов, то такой обменник будет соответствовать обменнику с типом direct.
Отправка сообщений
Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.
После чего возможна публикация сообщений по ключу
Получение сообщений
Получение сообщений ничем не отличается от предыдущего урока
Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Реализация RPC шаблона
Во втором уроке была реализована очередь, которая распределяла нагрузку между всеми имеющимися консьюмерами. Но, что если нам нужно получить результат от обработчика очереди. Такой подход известен как вызов удаленных процедур или RPC(remote procedure call). В этом уроке будет реализована модель RPC с использованием очереди сообщений RabbitMQ. Конечно, такой подход предполагает, что обработка не должна занимать много времени. Для реализации примера наша функция обработчик будет изменять сообщение “message before” на “message after”.
В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение, а сервере отвечает. Для обработки ответа сервера, необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа, мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса
Обратите внимание, что callback очередь создается с флагом AMQP_EXCLUSIVE, что означает, что только один консьюмер может слушать эту очередь.
Correlation ID
В методе, представленном выше, мы предполагаем создавать callback очередь для каждого RPC запроса. Поскольку нельзя однозначно по имени очереди определить какому запросу принадлежит ответ, в запрос также добавляется параметр correlationId, который имеет уникальное значение для каждого запроса. Позже, когда мы получим ответ, мы сможем сравнить его correlationId со значением, переданным вместе с запросом. И в случае их несовпадения просто отбросить полученный ответ.
Итоговый план действий
Все вместе
Функция обработки сообщения на стороне сервера выглядит следующим образом
Функция обработки сообщения на стороне клиента