Rabbitmq что такое exchange

RabbitMQ tutorial 3 — Публикация/Подписка

Хочу продолжить серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.

Публикация/Подписка

В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что каждое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Этот паттерн известен как «publish/subscribe» (публикация/подписка).
Чтобы понять этот шаблон, создадим простую систему логирования. Она будет состоять из двух программ – первая будет создавать логи, вторая считывать и печатать их.

В нашей систему логирования каждая программа подписчик будет получать каждое сообщение. Благодаря этому, мы сможем запустить одного подписчика на сохранение логов на диск, а потом в любое время сможем создать другого подписчика для отображения логов на экран.

По существу, каждое сообщение будет транслироваться каждому подписчику.

Точки обмена(exchanges)

В предыдущих статьях для отправки и принятия сообщений мы работали с очередью. Теперь рассмотрим расширенную модель отправки сообщений Rabbit.

Основная идея в модели отправки сообщений Rabbit – Поставщик(producer) никогда не отправляет сообщения напрямую в очередь. Фактически, довольно часто поставщик не знает, дошло ли его сообщение до конкретной очереди.
Вместо этого поставщик отправляет сообщение в точку доступа. В точке доступа нет ничего сложного. Точка доступа выполняет две функции:

— получает сообщения от поставщика;
— отправляет эти сообщения в очередь.

Точка доступа точно знает, что делать с поступившими сообщениями. Отправить сообщение в конкретную очередь, либо в несколько очередей, либо не отправлять никому и удалить его. Эти правила описываются в типе точки доступа (exchange type).

Rabbitmq что такое exchange. image loader. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-image loader. картинка Rabbitmq что такое exchange. картинка image loader

Существуют несколько типов: direct, topic, headers и fanout. Мы остановимся на последнем типе fanout. Создадим точку с доступа с этим типом и назовем её – logs:

Тип fanout – очень прост. Он копирует все сообщения которые поступают к нему во все очереди, которые ему доступны. Это то что нам нужно для нашей системы логирования.

Просмотр списка точек доступа:

Чтобы посмотреть все точки доступа на сервере, необходимо выполнить команду rabbitmqctl:

Мы видим список точек доступа с наименованием amq.* и точку доступа без имени, которая используется по умолчанию (она не подходит для выполнения нашей задачи).

Наименование точек доступа.

В предыдущих статьях мы ничего не знали о точках доступа, но всё-таки могли отправлять письма в очередь. Это было возможно, потому что использовали точку доступа по умолчанию, которая идентифицируется пустой строкой “”.
Вспомним как раньше мы отправляли письма:

Здесь используется точка доступа по умолчанию или безымянная точка доступа: сообщение направляется в очередь, идентифицированную через ключ “routing_key”. Ключ “routing_key” передается через третий параметр функции basic_publish.

Теперь мы можем отправить сообщение в нашу именованную точку доступа:

Временные очереди:

Всё это время мы использовали наименование очередей (“hello“ или “task_queue”). Возможность давать наименования помогает указать обработчикам (workers) определенную очередь, а также делить очередь между продюсерами и подписчиками.

Но наша система логирования требует, чтобы в очередь поступали все сообщения, а не только часть. Также мы хотим, чтобы сообщения были актуальными, а не старыми. Для этого нам понадобиться 2 вещи:

— Каждый раз когда мы соединяемся с Rabbit, мы создаем новую очередь, или даем создать серверу случайное наименование;
— Каждый раз когда подписчик отключается от Rabbit, мы удаляем очередь.

В php-amqplib клиенте, когда мы обращаемся к очереди без наименовании, мы создаем временную очередь и автоматически сгенерированным наименованием:

Метод вернет автоматически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
Когда заявленное соединение оборвется, очередь автоматически удалиться.

Переплеты(Bindings)

Rabbitmq что такое exchange. image loader. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-image loader. картинка Rabbitmq что такое exchange. картинка image loader

Итак, у нас есть точка доступа с типом fanout и очередь. Сейчас нам нужно сказать точке доступа, чтобы она отправила сообщение в очередь. Отношение между точкой доступа и очередью называется bindings.

С этого момента, сообщения для нашей очереди проходят через точку доступа.
Посмотреть список binding-ов можно используя команду rabbitmqctl list_bindings.

Отправка во все очереди:

Rabbitmq что такое exchange. image loader. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-image loader. картинка Rabbitmq что такое exchange. картинка image loader

Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Единственное важное отличие – теперь мы направляем сообщения в нашу именованную точку доступа ‘logs’, вместо точки доступа по умолчанию. Нам нужно было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости.

Рассмотрим код скрипта emit_log.php:

Как вы видите, после установки соединения мы создаем точку доступа. Этот шаг необходим, так как использование несуществующей точки доступа – запрещено.

Сообщение в точке доступа будут потеряны, так как ни одна очередь не связана с точкой доступа. Но это хорошо для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут безопасно удалятся.

Код подписчика receive_logs.php:

Если вы хотите сохранить логи в файл, вам потребуется открыть консоль и набрать:

$ php receive_logs.php > logs_from_rabbit.log
Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:

$ php receive_logs.php
Ну и, конечно, запуск продюсера сообщений:

С помощью команды rabbitmqctl list_bindings мы можем удостовериться, что код правильно создал очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:

В следующей статье будет описано, как прослушать только часть сообщений.

Источник

AMQP 0-9-1 Model Explained

Overview

This guide provides an overview of the AMQP 0-9-1 protocol, one of the protocols supported by RabbitMQ.

High-level Overview of AMQP 0-9-1 and the AMQP Model

What is AMQP 0-9-1?

AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

Brokers and Their Role

Messaging brokers receive messages from publishers (applications that publish them, also known as producers) and route them to consumers (applications that process them).

Since it is a network protocol, the publishers, consumers and the broker can all reside on different machines.

AMQP 0-9-1 Model in Brief

The AMQP 0-9-1 Model has the following view of the world: messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then the broker either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.

Rabbitmq что такое exchange. hello world example routing. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-hello world example routing. картинка Rabbitmq что такое exchange. картинка hello world example routing

When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message.

Networks are unreliable and applications may fail to process messages therefore the AMQP 0-9-1 model has a notion of message acknowledgements: when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message (or group of messages).

In certain situations, for example, when a message cannot be routed, messages may be returned to publishers, dropped, or, if the broker implements an extension, placed into a so-called «dead letter queue». Publishers choose how to handle situations like this by publishing messages using certain parameters.

Queues, exchanges and bindings are collectively referred to as AMQP entities.

AMQP 0-9-1 is a Programmable Protocol

AMQP 0-9-1 is a programmable protocol in the sense that AMQP 0-9-1 entities and routing schemes are primarily defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.

This gives application developers a lot of freedom but also requires them to be aware of potential definition conflicts. In practice, definition conflicts are rare and often indicate a misconfiguration.

Applications declare the AMQP 0-9-1 entities that they need, define necessary routing schemes and may choose to delete AMQP 0-9-1 entities when they are no longer used.

Exchanges and Exchange Types

Exchanges are AMQP 0-9-1 entities where messages are sent to. Exchanges take a message and route it into zero or more queues. The routing algorithm used depends on the exchange type and rules called bindings. AMQP 0-9-1 brokers provide four exchange types:

Exchange typeDefault pre-declared names
Direct exchange(Empty string) and amq.direct
Fanout exchangeamq.fanout
Topic exchangeamq.topic
Headers exchangeamq.match (and amq.headers in RabbitMQ)

Besides the exchange type, exchanges are declared with a number of attributes, the most important of which are:

Exchanges can be durable or transient. Durable exchanges survive broker restart whereas transient exchanges do not (they have to be redeclared when broker comes back online). Not all scenarios and use cases require exchanges to be durable.

Default Exchange

The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.

For example, when you declare a queue with the name of «search-indexing-online», the AMQP 0-9-1 broker will bind it to the default exchange using «search-indexing-online» as the routing key (in this context sometimes referred to as the binding key). Therefore, a message published to the default exchange with the routing key «search-indexing-online» will be routed to the queue «search-indexing-online». In other words, the default exchange makes it seem like it is possible to deliver messages directly to queues, even though that is not technically what is happening.

Direct Exchange

A direct exchange delivers messages to queues based on the message routing key. A direct exchange is ideal for the unicast routing of messages (although they can be used for multicast routing as well). Here is how it works:

Direct exchanges are often used to distribute tasks between multiple workers (instances of the same application) in a round robin manner. When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.

A direct exchange can be represented graphically as follows:

Rabbitmq что такое exchange. exchange direct. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-exchange direct. картинка Rabbitmq что такое exchange. картинка exchange direct

Fanout Exchange

A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored. If N queues are bound to a fanout exchange, when a new message is published to that exchange a copy of the message is delivered to all N queues. Fanout exchanges are ideal for the broadcast routing of messages.

Because a fanout exchange delivers a copy of a message to every queue bound to it, its use cases are quite similar:

A fanout exchange can be represented graphically as follows:

Rabbitmq что такое exchange. exchange fanout. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-exchange fanout. картинка Rabbitmq что такое exchange. картинка exchange fanout

Topic Exchange

Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.

Topic exchanges have a very broad set of use cases. Whenever a problem involves multiple consumers/applications that selectively choose which type of messages they want to receive, the use of topic exchanges should be considered.

Headers Exchange

A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.

It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the «x-match» binding argument is for. When the «x-match» argument is set to «any», just one matching header value is sufficient. Alternatively, setting «x-match» to «all» mandates that all the values must match.

Headers exchanges can be looked upon as «direct exchanges on steroids». Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.

Note that headers beginning with the string x- will not be used to evaluate matches.

Queues

Queues in the AMQP 0-9-1 model are very similar to queues in other message- and task-queueing systems: they store messages that are consumed by applications. Queues share some properties with exchanges, but also have some additional properties:

Before a queue can be used it has to be declared. Declaring a queue will cause it to be created if it does not already exist. The declaration will have no effect if the queue does already exist and its attributes are the same as those in the declaration. When the existing queue attributes are not the same as those in the declaration a channel-level exception with code 406 ( PRECONDITION_FAILED ) will be raised.

Queue Names

Applications may pick queue names or ask the broker to generate a name for them. Queue names may be up to 255 bytes of UTF-8 characters. An AMQP 0-9-1 broker can generate a unique queue name on behalf of an app. To use this feature, pass an empty string as the queue name argument. The generated name will be returned to the client with queue declaration response.

Queue names starting with «amq.» are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 ( ACCESS_REFUSED ).

Queue Durability

In AMQP 0-9-1, queues can be declared as durable or transient. Metadata of a durable queue is stored on disk, while metadata of a transient queue is stored in memory when possible.

The same distinction is made for messages at publishing time.

In environments and use cases where durability is important, applications must use durable queues and make sure that publish mark published messages as persisted.

This topic is covered in more detailed in the Queues guide.

Bindings

Bindings are rules that exchanges use (among other things) to route messages to queues. To instruct an exchange E to route messages to a queue Q, Q has to be bound to E. Bindings may have an optional routing key attribute used by some exchange types. The purpose of the routing key is to select certain messages published to an exchange to be routed to the bound queue. In other words, the routing key acts like a filter.

To draw an analogy:

Having this layer of indirection enables routing scenarios that are impossible or very hard to implement using publishing directly to queues and also eliminates certain amount of duplicated work application developers have to do.

If a message cannot be routed to any queue (for example, because there are no bindings for the exchange it was published to) it is either dropped or returned to the publisher, depending on message attributes the publisher has set.

Consumers

Storing messages in queues is useless unless applications can consume them. In the AMQP 0-9-1 Model, there are two ways for applications to do this:

With the «push API», applications have to indicate interest in consuming messages from a particular queue. When they do so, we say that they register a consumer or, simply put, subscribe to a queue. It is possible to have more than one consumer per queue or to register an exclusive consumer (excludes all other consumers from the queue while it is consuming).

Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings.

Message Acknowledgements

Consumer applications – that is, applications that receive and process messages – may occasionally fail to process individual messages or will sometimes just crash. There is also the possibility of network issues causing problems. This raises a question: when should the broker remove messages from queues? The AMQP 0-9-1 specification gives consumers control over this. There are two acknowledgement modes:

The former choice is called the automatic acknowledgement model, while the latter is called the explicit acknowledgement model. With the explicit model the application chooses when it is time to send an acknowledgement. It can be right after receiving a message, or after persisting it to a data store before processing, or after fully processing the message (for example, successfully fetching a Web page, processing and storing it into some persistent data store).

If a consumer dies without sending an acknowledgement, the broker will redeliver it to another consumer or, if none are available at the time, the broker will wait until at least one consumer is registered for the same queue before attempting redelivery.

Rejecting Messages

When a consumer application receives a message, processing of that message may or may not succeed. An application can indicate to the broker that message processing has failed (or cannot be accomplished at the time) by rejecting a message. When rejecting a message, an application can ask the broker to discard or requeue it. When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.

Negative Acknowledgements

Messages are rejected with the basic.reject method. There is one limitation that basic.reject has: there is no way to reject multiple messages as you can do with acknowledgements. However, if you are using RabbitMQ, then there is a solution. RabbitMQ provides an AMQP 0-9-1 extension known as negative acknowledgements or nacks. For more information, please refer to the Confirmations and basic.nack extension guides.

Prefetching Messages

For cases when multiple consumers share a queue, it is useful to be able to specify how many messages each consumer can be sent at once before sending the next acknowledgement. This can be used as a simple load balancing technique or to improve throughput if messages tend to be published in batches. For example, if a producing application sends messages every minute because of the nature of the work it is doing.

Note that RabbitMQ only supports channel-level prefetch-count, not connection or size based prefetching.

Message Attributes and Payload

Messages in the AMQP 0-9-1 model have attributes. Some attributes are so common that the AMQP 0-9-1 specification defines them and application developers do not have to think about the exact attribute name. Some examples are

Some attributes are used by AMQP brokers, but most are open to interpretation by applications that receive them. Some attributes are optional and known as headers. They are similar to X-Headers in HTTP. Message attributes are set when a message is published.

Messages also have a payload (the data that they carry), which AMQP brokers treat as an opaque byte array. The broker will not inspect or modify the payload. It is possible for messages to contain only attributes and no payload. It is common to use serialisation formats like JSON, Thrift, Protocol Buffers and MessagePack to serialize structured data in order to publish it as the message payload. Protocol peers typically use the «content-type» and «content-encoding» fields to communicate this information, but this is by convention only.

Messages may be published as persistent, which makes the broker persist them to disk. If the server is restarted the system ensures that received persistent messages are not lost. Simply publishing a message to a durable exchange or the fact that the queue(s) it is routed to are durable doesn’t make a message persistent: it all depends on persistence mode of the message itself. Publishing messages as persistent affects performance (just like with data stores, durability comes at a certain cost in performance).

Message Acknowledgements

Since networks are unreliable and applications fail, it is often necessary to have some kind of processing acknowledgement. Sometimes it is only necessary to acknowledge the fact that a message has been received. Sometimes acknowledgements mean that a message was validated and processed by a consumer, for example, verified as having mandatory data and persisted to a data store or indexed.

This situation is very common, so AMQP 0-9-1 has a built-in feature called message acknowledgements (sometimes referred to as acks) that consumers use to confirm message delivery and/or processing. If an application crashes (the AMQP broker notices this when the connection is closed), if an acknowledgement for a message was expected but not received by the AMQP broker, the message is re-queued (and possibly immediately delivered to another consumer, if any exists).

Having acknowledgements built into the protocol helps developers to build more robust software.

AMQP 0-9-1 Methods

AMQP 0-9-1 is structured as a number of methods. Methods are operations (like HTTP methods) and have nothing in common with methods in object-oriented programming languages. Protocol methods in AMQP 0-9-1 are grouped into classes. Classes are just logical groupings of AMQP methods. The AMQP 0-9-1 reference has full details of all the AMQP methods.

Let us take a look at the exchange class, a group of methods related to operations on exchanges. It includes the following operations:

(note that the RabbitMQ site reference also includes RabbitMQ-specific extensions to the exchange class that we will not discuss in this guide).

As an example, the client asks the broker to declare a new exchange using the exchange.declare method:

Rabbitmq что такое exchange. exchange declare. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-exchange declare. картинка Rabbitmq что такое exchange. картинка exchange declare

As shown on the diagram above, exchange.declare carries several parameters. They enable the client to specify exchange name, type, durability flag and so on.

If the operation succeeds, the broker responds with the exchange.declare-ok method:

Rabbitmq что такое exchange. exchange declare ok. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-exchange declare ok. картинка Rabbitmq что такое exchange. картинка exchange declare ok

exchange.declare-ok does not carry any parameters except for the channel number (channels will be described later in this guide).

The sequence of events is very similar for another method pair on the AMQP 0-9-1 queue method class: queue.declare and queue.declare-ok :

Rabbitmq что такое exchange. queue declare. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-queue declare. картинка Rabbitmq что такое exchange. картинка queue declare

Rabbitmq что такое exchange. queue declare ok. Rabbitmq что такое exchange фото. Rabbitmq что такое exchange-queue declare ok. картинка Rabbitmq что такое exchange. картинка queue declare ok

Connections

AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.

Channels

Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed with channels that can be thought of as «lightweight connections that share a single TCP connection».

Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.

A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

Virtual Hosts

To make it possible for a single broker to host multiple isolated «environments» (groups of users, exchanges, queues and so on), AMQP 0-9-1 includes the concept of virtual hosts (vhosts). They are similar to virtual hosts used by many popular Web servers and provide completely isolated environments in which AMQP entities live. Protocol clients specify what vhosts they want to use during connection negotiation.

AMQP is Extensible

AMQP 0-9-1 has several extension points:

These features make the AMQP 0-9-1 Model even more flexible and applicable to a very broad range of problems.

AMQP 0-9-1 Clients Ecosystem

There are many AMQP 0-9-1 clients for many popular programming languages and platforms. Some of them follow AMQP terminology closely and only provide implementation of AMQP methods. Some others have additional features, convenience methods and abstractions. Some of the clients are asynchronous (non-blocking), some are synchronous (blocking), some support both models. Some clients support vendor-specific extensions (for example, RabbitMQ-specific extensions).

Because one of the main AMQP goals is interoperability, it is a good idea for developers to understand protocol operations and not limit themselves to terminology of a particular client library. This way communicating with developers using different libraries will be significantly easier.

Getting Help and Providing Feedback

If you have questions about the contents of this guide or any other topic related to RabbitMQ, don’t hesitate to ask them on the RabbitMQ mailing list.

Help Us Improve the Docs <3

If you’d like to contribute an improvement to the site, its source is available on GitHub. Simply fork the repository and submit a pull request. Thank you!

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *