@Override

public Message postProcessMessage(Message message) throws JMSException {

message.setStringProperty("X_ORDER_SOURCE", "WEB");

return message;

}

});

Возможно, вы заметили, что MessagePostProcessor - это функциональный интерфейс. Это означает, что вы можете немного упростить его, заменив анонимный внутренний класс лямбда-выражением:

jms.convertAndSend("tacocloud.order.queue", order,

message -> {

message.setStringProperty("X_ORDER_SOURCE", "WEB");

return message;

});

Хотя вам нужен только этот конкретный MessagePostProcessor для этого одного вызова метода convertAndSend(), вы можете использовать один и тот же MessagePostProcessor для нескольких различных вызовов convertAndSend(). В этих случаях, возможно, ссылка на метод является лучшим выбором, чем лямбда, во избежание ненужного дублирования кода:

@GetMapping("/convertAndSend/order")

public String convertAndSendOrder() {

Order order = buildOrder();

jms.convertAndSend("tacocloud.order.queue", order,

this::addOrderSource);

return "Convert and sent order";

}

private Message addOrderSource(Message message) throws JMSException {

message.setStringProperty("X_ORDER_SOURCE", "WEB");

return message;

}

Теперь вы видели несколько способов отправки сообщений. Но не стоит отправлять сообщение, если никто его не получит. Давайте посмотрим, как вы можете получать сообщения с помощью Spring и JMS.

8.1.3 Получение JMS сообщений

Когда дело доходит до получения сообщений, у вас есть выбор pull (получения) модель, где ваш код запрашивает сообщение и ожидает его получение, или push (проталкивания) модель, в которой сообщения передаются в ваш код по мере их поступления.

JmsTemplate предлагает несколько методов для получения сообщений, но все они используют pull модель. Вы вызываете один из этих методов для запроса сообщения, и поток блокируется до тех пор, пока сообщение не станет доступным (что может быть немедленно или может занять некоторое время).

С другой стороны, у вас также есть возможность использовать push-модель, в которой вы определяете слушатель (listener) сообщений, который вызывается каждый раз, когда сообщение доступно.

Оба варианта подходят для различных вариантов использования. Общепринято, что push-модель - лучший выбор, так как она не блокирует поток. Но в некоторых случаях использование слушателя (listener) может быть перегружено, если сообщения приходят слишком быстро. Pull (извлечения) модель позволяет потребителю заявить, что он готов обработать новое сообщение.

Давайте рассмотрим оба способа получения сообщений. Начнем с pull модели, предлагаемой JmsTemplate.

ПОЛУЧЕНИЕ С JmsTemplate

JmsTemplate предлагает несколько методов для методов извлечения из брокера, включая следующие:

Message receive() throws JmsException;

Message receive(Destination destination) throws JmsException;

Message receive(String destinationName) throws JmsException;

Object receiveAndConvert() throws JmsException;

Object receiveAndConvert(Destination destination) throws JmsException;

Object receiveAndConvert(String destinationName) throws JmsException;

Как видите, эти шесть методов отражение методов send() и convertAndSend () из JmsTemplate. Методы receive() получают необработанное Message, а методы receiveAndConvert() используют сконфигурированный конвертер сообщений для преобразования сообщений в типы доменов. И для каждого из них вы можете указать либо Destination, либо String, содержащую имя пункта назначения, или вы можете получить сообщение из пункта назначения по умолчанию.

Чтобы увидеть их в действии, вы напишите некоторый код, который извлекает Order из пункта назначения tacocloud.order.queue. В следующем листинге показан OrderReceiver, компонент службы, который получает данные заказа с помощью JmsTemplate.receive().

Листинг 8.2. Получение заказов из очереди

package tacos.kitchen.messaging.jms;

import javax.jms.Message;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.support.converter.MessageConverter;

import org.springframework.stereotype.Component;

@Component

public class JmsOrderReceiver implements OrderReceiver {

private JmsTemplate jms;

private MessageConverter converter;

@Autowired

public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {

this.jms = jms;

this.converter = converter;

}

public Order receiveOrder() {

Message message = jms.receive("tacocloud.order.queue");

return (Order) converter.fromMessage(message);

}

}

Здесь вы использовали String, чтобы указать пункт назначения для получения заказа. Метод receive() возвращает не конвертированный Message. Нам нужен Order, который находится внутри, Message, поэтому следующее, что происходит, - это использование конвертера сообщения для его преобразования. Свойство type ID в сообщении поможет конвертеру преобразовать его в Order, но ответ будет в виде Object, который придется привести к нужному типу перед тем, как вы сможете его вернуть.

Получение необработанного объекта Message может быть полезно в некоторых случаях, когда вам необходимо проверить свойства и заголовки сообщения. Но часто вам нужна только полезная нагрузка. Преобразование этой полезной нагрузки в тип домена является двухэтапным процессом и требует, чтобы преобразователь сообщений был внедрен в компонент. Когда вы заботитесь только о полезной нагрузке сообщения, использовать receiveAndConvert() будет намного проще. В следующем листинге показано, как JmsOrderReceiver может быть переработан для использования receiveAndConvert () вместо receive().

Листинг 8.3 Получение преобразованного объекта Order

package tacos.kitchen.messaging.jms;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Component;

@Component

public class JmsOrderReceiver implements OrderReceiver {

private JmsTemplate jms;

@Autowired

public JmsOrderReceiver(JmsTemplate jms) {

this.jms = jms;

}

public Order receiveOrder() {

return (Order) jms.receiveAndConvert("tacocloud.order.queue");

}

}

Эта новая версия JmsOrderReceiver имеет метод receieveOrder (), который был сокращен до одной строки. И вам больше не нужно внедрять MessageConverter, потому что все преобразования сообщений будут выполняться под капотом в receiveAndConvert ().

Прежде чем двигаться дальше, давайте рассмотрим, как receiveOrder() может быть использован в кухонном приложении Taco Cloud. Повар на одной из кухонь Taco Cloud может нажать кнопку или предпринять какие-либо действия, чтобы указать, что они готовы начать создание тако. В этот момент будет вызван receiveOrder(), и вызов метода receive() или receiveAndConvert() будет заблокирован. Больше ничего не произойдет, пока сообщение о заказе не будет готово. Как только заказ поступит, он будет возвращен функцией receiveOrder(), и его информация будет использоваться для отображения деталей заказа, чтобы повар мог приступить к работе. Это кажется естественным выбором для модели тянуть (pull).

Теперь давайте посмотрим, как работает push-модель, объявив JMS-слушатель.

ОБЪЯВЛЕНИЕ ЛИСТЕНЕРА СООБЩЕНИЙ

В отличие от модели извлечения (pull), где для получения сообщения требовался явный вызов метода receive() или receiveAndConvert(), листенер сообщений является пассивным компонентом, который простаивает до получения сообщения.

Чтобы создать листенер сообщений, который реагирует на сообщения JMS, вы просто должны аннотировать метод в компоненте с помощью @JmsListener. В следующем листинге показан новый компонент OrderListener, который пассивно прослушивает сообщения, а не активно запрашивает их.

Листинг 8.4. Компонент OrderListener, который прослушивает заказы

package tacos.kitchen.messaging.jms.listener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

@Component

public class OrderListener {

private KitchenUI ui;

@Autowired

public OrderListener(KitchenUI ui) {

this.ui = ui;

}

@JmsListener(destination = "tacocloud.order.queue")

public void receiveOrder(Order order) {

ui.displayOrder(order);

}

}

Метод receiveOrder () аннотирован JmsListener для «прослушивания» сообщений в месте назначения tacocloud.order.queue. Он не имеет отношения к JmsTemplate и не вызывается явно кодом вашего приложения. Вместо этого, каркасный код в Spring ожидает поступления сообщений в указанный пункт назначения, и когда они приходят, метод receiveOrder() вызывается автоматически с полезной нагрузкой Order в качестве параметра.

Во многих отношениях аннотация @JmsListener похожа на одну из аннотаций сопоставления запросов Spring MVC, например, @GetMapping или @PostMapping. В Spring MVC методы, аннотированные одним из методов отображения запросов, реагируют на запросы по указанному пути. Точно так же методы, аннотированные @JmsListener, реагируют на сообщения, поступающие в пункт назначения.

Слушатели сообщений часто советуются как лучший выбор, потому что они не блокируют и могут быстро обрабатывать несколько сообщений. Однако в контексте приложения Taco Cloud они, возможно, не лучший выбор. Повара являются существенным узким местом в системе и могут не иметь возможности готовить тако так быстро, как поступают заказы. Повар может наполовину выполнить заказ, когда на экране отображается новый заказ. Пользовательский интерфейс кухни должен был бы буферизовать заказы по мере их поступления, чтобы не перегружать кухонный персонал.

Нельзя сказать, что слушатели сообщений плохие. Напротив, они идеально подходят для быстрой обработки сообщений. Но когда обработчики сообщений должны иметь возможность запрашивать бОльшую часть сообщений в свое время, модель извлечения, предлагаемая JmsTemplate, кажется более подходящей.

Поскольку JMS определяется стандартной спецификацией Java и поддерживается многими реализациями брокера сообщений, это обычный выбор для обмена сообщениями в Java. Но у JMS есть несколько недостатков, не последним из которых является то, что в качестве спецификации Java его использование ограничено приложениями Java. Новые опции обмена сообщениями, такие как RabbitMQ и Kafka, устраняют эти недостатки и доступны для других языков и платформ, помимо JVM. Давайте отложим JMS и посмотрим, как вы могли бы реализовать обмен сообщениями о заказах тако с RabbitMQ.

8.2 Работа с RabbitMQ и AMQP

Как, возможно, самая известная реализация AMQP, RabbitMQ предлагает более продвинутую стратегию маршрутизации сообщений, чем JMS. Принимая во внимание, что сообщения JMS адресуются с именем пункта назначения, из которого получатель получит их, сообщения AMQP адресуются с именем обмена и ключом маршрутизации, которые отделены от очереди, которую прослушивает получатель. Эта связь между обменом и очередями показана на рисунке 8.1.

Рис. 8.1. Сообщения, отправляемые на обмен RabbitMQ, направляются в одну или несколько очередей на основе ключей маршрутизации и привязок.

Когда сообщение приходит к брокеру RabbitMQ, оно отправляется на тот обмен (exchange), для которого оно было адресовано. Exchange отвечает за маршрутизацию сообщения в одну или несколько очередей в зависимости от типа exchange, привязки между exchange и очередями (queue) и значения ключа маршрутизации сообщения.

Существует несколько видов обмена, включая следующие:

Default — обмен, который автоматически создается брокером. Он направляет сообщения в очереди, чье имя совпадает с ключом маршрутизации сообщения. Все очереди по умолчанию привязаны к default exchange.

Direct - направляет сообщения в очередь, ключ привязки которой совпадает с ключом маршрутизации сообщения.

Topic - направляет сообщение в одну или несколько очередей, в которых ключ привязки (который может содержать символы подстановки) соответствует ключу маршрутизации сообщения.

Fanout - маршрутизирует сообщения во все связанные очереди без учета ключей привязки или ключей маршрутизации.

Headers - аналогичны обмену topic, за исключением того, что маршрутизация основана на значениях заголовков сообщений, а не на ключах маршрутизации.

Dead letter - универсальное сообщение для всех сообщений, которые невозможно доставить (это означает, что они не соответствуют какой-либо определенной привязке обмена к очереди).

Простейшие формы обмена это Default и Fanout, так как они примерно соответствуют очереди и теме JMS. Но другие типы обмена позволяют задать более гибкие схемы маршрутизации.

Самая важная вещь для понимания - это то, что сообщения отправляются на обменники (exchanges) с ключами маршрутизации, и они берутся из очередей. То, как они попадают из обмена в очередь, зависит от определений привязки и того, что лучше всего подходит для ваших сценариев использования.

Какой тип обмена вы используете и как вы определяете привязки между обменами и очередями, мало влияет на то, как сообщения отправляются и принимаются в ваших приложениях Spring. Поэтому мы сосредоточимся на том, как написать код, который отправляет и получает сообщения с помощью Rabbit.

П р и м е ч а н и е - Более подробное обсуждение того, как лучше связать очереди с обменниками , см. RabbitMQ in Action Alvaro Videla and Jason J.W. Williams (Manning, 2012).

8.2.1 Добавление RabbitMQ в Spring

Прежде чем вы сможете начать отправку и получение сообщений RabbitMQ с помощью Spring, вам нужно добавить в свою сборку зависимость AMQP-стартера Spring Boot вместо Artemis или ActiveMQ-стартера, которые вы добавили в предыдущем разделе:

org.springframework.boot

spring-boot-starter-amqp

Добавление стартера AMQP в вашу сборку вызовет автоконфигурацию, которая создаст фабрику соединений AMQP и компоненты RabbitTemplate, а также другие вспомогательные компоненты. Просто добавьте эту зависимость, чтобы начать отправку и получение сообщений от брокера RabbitMQ с помощью Spring. Но есть несколько полезных свойств, о которых вы захотите узнать, перечисленных в таблице 8.4.

Таблица 8.4 Свойства для настройки местоположения и учетных данных RabbitMQ брокера

spring.rabbitmq.addresses - Разделенный запятыми список адресов брокера RabbitMQ

spring.rabbitmq.host - Хост брокера (по умолчанию localhost)

spring.rabbitmq.port - Порт брокера (по умолчанию 5672)

spring.rabbitmq.username - Имя пользователя для доступа к брокеру (необязательно)

spring.rabbitmq.password - Пароль пользователя для доступа к брокеру (необязательно)

В целях разработки у вас, вероятно, будет брокер RabbitMQ, для которого не требуется проверка подлинности на вашем локальном компьютере с слушателем порта 5672. Эти свойства, скорее всего, не будут особенно полезны, пока вы еще в разработке, но они без сомнения, окажутся полезными, когда ваши приложения будут запущены в продакшен.

Например, предположим, что при переходе в продакшен ваш брокер RabbitMQ находится на сервере с именем rabbit.tacocloud.com, прослушивает порт 5673 и требует учетных данных. В этом случае следующая конфигурация в вашем файле application.yml установит эти свойства, когда профиль prod активен:

spring:

profiles: prod

rabbitmq:

host: rabbit.tacocloud.com

port: 5673

username: tacoweb

password: l3tm31n

Теперь, когда RabbitMQ настроен в вашем приложении, пришло время начать отправку сообщений с RabbitTemplate.

8.2.2 Отправка сообщений с RabbitTemplate

В основе поддержки обмена сообщениями RabbitMQ от Spring лежит RabbitTemplate. RabbitTemplate похож на JmsTemplate, предлагая аналогичный набор методов. Однако, как вы увидите, есть некоторые тонкие различия, которые соответствуют уникальному способу работы RabbitMQ.

Что касается отправки сообщений с помощью RabbitTemplate, методы send() и convertAndSend() аналогичны методам с такими же именами из JmsTemplate. Но в отличие от методов JmsTemplate, которые направляют сообщения только в определенную очередь или тему, методы RabbitTemplate отправляют сообщения в с точки зрения обмена и ключей маршрутизации. Вот несколько наиболее важных методов отправки сообщений с RabbitTemplate (эти методы определены в AmqpTemplate, интерфейсе, реализованном RabbitTemplate.):

// Отправка необработанных сообщений

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

// Отправлять сообщения, преобразованные из объектов

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// Отправка сообщений, преобразованных из объектов с последующей обработкой (post-processing)

void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;

void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

Как видите, эти методы следуют шаблону, аналогичному их близнецам в JmsTemplate. Первые три метода send() отправляют необработанный объект Message. Следующие три метода convertAndSend() принимают объект, который будет преобразован в Message за кулисами перед отправкой. Последние три метода convertAndSend() похожи на предыдущие три, но они принимают MessagePostProcessor, который можно использовать для манипулирования объектом Message до его отправки брокеру.

Эти методы отличаются от своих аналогов JmsTemplate тем, что они принимают значения String, чтобы указать ключ обмена и маршрутизации, а не имя назначения (или Destination назначения). Методы, которые не принимают exchange, будут отправлять свои сообщения в exchange по умолчанию. Аналогично, методы, которые не принимают ключ маршрутизации, будут маршрутизировать свои сообщения с ключом маршрутизации по умолчанию.

Давайте включим RabbitTemplate для работы с отправкой тако-заказов. Один из способов сделать это - использовать метод send(), как показано в листинге 8.5. Но прежде чем вы сможете вызвать send(), вам нужно преобразовать объект Order в a Message. Это может быть утомительной работой, если бы не тот факт, что RabbitTemplate делает свой конвертер сообщений доступным с помощью метода getMessageConverter().

Листинг 8.5 Отправка сообщений с RabbitTemplate.send()

package tacos.messaging;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageProperties;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import tacos.Order;

@Service

public class RabbitOrderMessagingService implements OrderMessagingService {

private RabbitTemplate rabbit;

@Autowired

public RabbitOrderMessagingService(RabbitTemplate rabbit) {

this.rabbit = rabbit;

}

public void sendOrder(Order order) {

MessageConverter converter = rabbit.getMessageConverter();

MessageProperties props = new MessageProperties();

Message message = converter.toMessage(order, props);

rabbit.send("tacocloud.order", message);

}

}

Если у вас есть MessageConverter, конвертировать Order в Message становится просто. Вы должны задать любые свойства сообщения с MessageProperties, но если вам не нужно устанавливать какие-либо такие свойства, то подойдет экземпляр MessageProperties по умолчанию. Затем все, что осталось - это вызвать send(), передав ключ обмена и маршрутизации (оба из которых являются необязательными) вместе с сообщением. В этом примере вы указываете только ключ маршрутизации - tacocloud.order - вместе с сообщением, поэтому будет использоваться exchange (обмен) по умолчанию.

Говоря об default exchange, именем обмена по умолчанию является «» (пустая строка), что соответствует default exchange, автоматически создаваемому брокером RabbitMQ. Точно так же ключом маршрутизации по умолчанию является «» (маршрутизация которого зависит от рассматриваемого обмена и привязок (exchange and binding)). Вы можете переопределить эти значения по умолчанию, установив свойства spring.rabbitmq.template.exchange и spring.rabbitmq.template.routing-key:

spring:

rabbitmq:

template:

exchange: tacocloud.orders

routing-key: kitchens.central

В этом случае все сообщения, отправленные без указания exchange, будут автоматически отправлены на exchange, имя которого - tacocloud.orders. Если ключ маршрутизации также не указан в вызове send() или convertAndSend(), у сообщений будет ключ маршрутизации kitchens.central.

Создать объект Message из конвертера сообщений достаточно просто, но еще проще использовать convertAndSend(), чтобы RabbitTemplate обрабатывал всю работу по преобразованию за вас:

public void sendOrder(Order order) {

rabbit.convertAndSend("tacocloud.order", order);

}

НАСТРОЙКА ПРЕОБРАЗОВАТЕЛЯ СООБЩЕНИЙ

По умолчанию преобразование сообщений выполняется с помощью SimpleMessageConverter, который может преобразовывать простые типы (например, String) и объекты Serializable в объекты Message. Но Spring предлагает несколько конвертеров сообщений для RabbitTemplate, включая следующие:

Jackson2JsonMessageConverter—Ковертирует объекты В и ИЗ JSON используя Jackson 2 JSON processor

MarshallingMessageConverter—Конвертирует используя Spring Marshaller и Unmarshaller

SerializerMessageConverter—Ковертирует String aи нативные объекты любого типа используя Spring-овый Serializer и Deserializer абстракции

SimpleMessageConverter - преобразует строки, байтовые массивы и Serializable типы.

ContentTypeDelegatingMessageConverter - делегирует другому MessageConverter на основе заголовка contentType.

MessagingMessageConverter - делегирует базовый MessageConverter для преобразования сообщений и AmqpHeaderConverter для заголовков.

Если вам нужно изменить конвертер сообщений, все, что вам нужно сделать, это настроить bean-компонент типа MessageConverter. Например, для преобразования сообщений на основе JSON вы можете настроить конвертер Jackson2JsonMessageConver следующим образом:

@Bean

public MessageConverter messageConverter() {

return new Jackson2JsonMessageConverter();

}

Автоконфигурация Spring Boot обнаружит этот bean-компонент и вставит его в RabbitTemplate вместо конвертера сообщений по умолчанию.

НАСТРОЙКА СВОЙСТВ СООБЩЕНИЙ

Как и в случае с JMS, вам может потребоваться установить некоторые заголовки в отправляемых вами сообщениях. Например, допустим, вам нужно отправить X_ORDER_SOURCE для всех заказов, представленных через веб-сайт Taco Cloud. При создании ваших собственных объектов Message вы можете установить заголовок через экземпляр MessageProperties, который вы передаете конвертеру сообщений. Возвращаясь к методу sendOrder() из листинга 8.5, вам понадобится всего одна дополнительная строка кода для установки заголовка:

public void sendOrder(Order order) {

MessageConverter converter = rabbit.getMessageConverter();

MessageProperties props = new MessageProperties();

props.setHeader("X_ORDER_SOURCE", "WEB");

Message message = converter.toMessage(order, props);

rabbit.send("tacocloud.order", message);

}

Однако при использовании convertAndSend() у вас нет быстрого доступа к объекту MessageProperties. В этом вам может помочь MessagePostProcessor:

@Override

public void sendOrder(Order order) {

rabbit.convertAndSend("tacocloud.order.queue", order,

new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message)

throws AmqpException {

MessageProperties props = message.getMessageProperties();

props.setHeader("X_ORDER_SOURCE", "WEB");

return message;

}

});

}

Здесь вы предоставляете convertAndSend() анонимной внутренней реализацией MessagePostProcessor. В методе postProcessMessage() вы извлекаете свойства MessageProperties из сообщения и затем вызываете setHeader(), чтобы установить заголовок X_ORDER_SOURCE.

Теперь, когда вы увидели, как отправлять сообщения с помощью RabbitTemplate, давайте переключимся на код, который получает сообщения из очереди RabbitMQ.

8.2.3 Получение сообщения от RabbitMQ

Вы видели, что отправка сообщений с помощью RabbitTemplate не сильно отличается от отправки сообщений с помощью JmsTemplate. И, как выясннится, получение сообщений из очереди RabbitMQ не сильно отличается от JMS.

Как и в случае с JMS, у вас есть два варианта:

-Получение сообщений из очереди с помощью RabbitTemplate

-Отправка сообщений в методе, аннотированным @RabbitListener.

Давайте начнем с рассмотрения pull-овского метода RabbitTemplate.receive(), основанного на извлечении.

ПОЛУЧЕНИЕ СООБЩЕНИЙ С RABBITTEMPLATE

RabbitTemplate поставляется с несколькими методами для извлечения сообщений из очереди. Несколько наиболее полезных из них перечислены здесь:

// Получение сообщений

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

// Получать объекты, преобразованные из сообщений

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws

AmqpException;

// Получать типобезопасные объекты, преобразованные из сообщений

T receiveAndConvert(ParameterizedTypeReference type) throws

AmqpException;

T receiveAndConvert(String queueName, ParameterizedTypeReference type)

throws AmqpException;

T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference

type) throws AmqpException;

T receiveAndConvert(String queueName, long timeoutMillis,

ParameterizedTypeReference type) throws AmqpException;

Эти методы являются зеркальными изображениями методов send() и convertAndSend(), описанных ранее. Принимая во внимание, что send() используется для отправки необработанных объектов Message, а receive() получает необработанные объекты Message из очереди. Аналогично, receiveAndConvert() получает сообщения и использует конвертер сообщений для преобразования их в объекты домена перед их возвратом.

Но есть несколько очевидных различий в сигнатурах методов. Во-первых, ни один из этих методов не принимает ключ обмена или маршрутизации в качестве параметра. Это связано с тем, что обмены и ключи маршрутизации используются для маршрутизации сообщений в очереди, но как только сообщения находятся в очереди, их следующим пунктом назначения является потребитель, который вытаскивает их из очереди. Приложения-потребители не должны заниматься обменом или маршрутизацией ключей. Очередь - это единственное, что нужно знать приложениям-потребителям.

Вы также заметите, что многие методы принимают long параметр, указывающий тайм-аут для получения сообщений. По умолчанию время ожидания приема составляет 0 миллисекунд. То есть вызов метода receive() будет возвращен немедленно, потенциально с null значением, если сообщения недоступны. Это явное отличие от того, как ведут себя методы receive() в JmsTemplate. Передав значение тайм-аута, методы receive() и receiveAndConvert() блокируются до прибытия сообщения или до истечения времени ожидания. Но даже при ненулевом таймауте ваш код должен быть готов к null возврату.

Давайте посмотрим, как вы можете применить это на практике. Следующий листинг показывает новую реализацию OrderReceiver на основе Rabbit, которая использует RabbitTemplate для получения заказов.

Листинг 8.6. Получение заказов от RabbitMQ с помощью RabbitTemplate

package tacos.kitchen.messaging.rabbit;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class RabbitOrderReceiver {

private RabbitTemplate rabbit;

private MessageConverter converter;

@Autowired

public RabbitOrderReceiver(RabbitTemplate rabbit) {

this.rabbit = rabbit;

this.converter = rabbit.getMessageConverter();

}

public Order receiveOrder() {

Message message = rabbit.receive("tacocloud.orders");

return message != null ? (Order) converter.fromMessage(message) : null;

}

}

Метод receiveOrder() - это место, где происходит все действие. Он вызывает метод receive() на внедренном шаблоне RabbitTemplate для получения заказа из очереди tacocloud.orders. Он не предоставляет значения времени ожидания, поэтому вы можете только предполагать, что вызов немедленно возвращается либо с Message, либо с null значением. Если Message возвращается, вы используете MessageConverter из шаблона RabbitTemplate для преобразования Message в Order. Иначе, если receive() возвращает null, вы возвращаете null.

В зависимости от варианта использования вы можете установить небольшую задержку. Например, на верхнем дисплее кухни Taco Cloud вы можете подождать некоторое время, если нет заказов. Допустим, вы решили подождать до 30 секунд, прежде чем сдаться. Метод receiveOrder() может быть изменен для передачи 30 000 миллисекундной задержки для receive():

public Order receiveOrder() {

Message message = rabbit.receive("tacocloud.order.queue", 30000);

return message != null ? (Order) converter.fromMessage(message) : null;

}

Если вы похожи на меня, то, увидев такой захардкоженный элемент, вы почувствуете дискомфорт. Вы можете подумать, что было бы неплохо создать аннотированный класс @ConfigurationProperties, чтобы можно было настроить это время ожидания с помощью свойства конфигурации Spring Boot. Я бы согласился с вами, если бы не тот факт, что Spring Boot уже предлагает такое свойство конфигурации. Если вы хотите установить время ожидания с помощью конфигурации, просто удалите значение времени ожидания в вызове receive() и установите его в своей конфигурации с помощью свойства spring.rabbitmq.template.receive-timeout:

spring:

rabbitmq:

template:

receive-timeout: 30000

Вернемся к методу receiveOrder(), обратите внимание, что вам пришлось использовать конвертер сообщений из RabbitTemplate для преобразования входящего объекта Message в объект Order. Но если RabbitTemplate использует конвертер сообщений, почему он не может выполнить преобразование для вас? Именно для этого и предназначен метод receiveAndConvert(). Используя receiveAndConvert(), вы можете переписать receiveOrder() следующим образом:

public Order receiveOrder() {

return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");

}

Это намного проще, не так ли? Единственная тревожная вещь, которую я вижу, это приведение Object к Order. Однако есть альтернатива. Вместо этого вы можете передать ParameterizedTypeReference в receiveAndConvert() для непосредственного получения объекта Order:

public Order receiveOrder() {

return rabbit.receiveAndConvert("tacocloud.order.queue", new ParameterizedTypeReference() {});

}

Спорный вопрос, чем это лучше, но это более типобезопасный подход. Единственное требование для использования ParameterizedTypeReference с receiveAndConvert() состоит в том, что конвертер сообщений должен быть реализацией SmartMessageConverter; Jackson2JsonMessageConverter - единственная готовая реализация на выбор.

Модель pull, предлагаемая JmsTemplate, подходит для многих случаев использования, но часто лучше иметь код, который прослушивает сообщения и который вызывается при поступлении сообщений. Давайте посмотрим, как вы можете создавать управляемые сообщениями bean-компоненты, которые отвечают на сообщения RabbitMQ.

ОБРАБОТКА СООБЩЕНИЙ RABBITMQ СО СЛУШАТЕЛЯМИ

Для управляемых сообщениями компонентов RabbitMQ Spring предлагает RabbitListener, аналог RabbitMQ для JmsListener. Чтобы указать, что метод должен вызываться при поступлении сообщения в очередь RabbitMQ, аннотируйте метод компонента с помощью @RabbitTemplate.

Например, в следующем листинге показана реализация OrderReceiver в RabbitMQ, аннотированная для прослушивания сообщений заказа, а не для их запроса с помощью RabbitTemplate.

Листинг 8.7 объявление метода в качестве листенера сообщений RabbitMQ

package tacos.kitchen.messaging.rabbit.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class OrderListener {

private KitchenUI ui;

@Autowired

public OrderListener(KitchenUI ui) {

this.ui = ui;

}

@RabbitListener(queues = "tacocloud.order.queue")

public void receiveOrder(Order order) {

ui.displayOrder(order);

}

}

Вы без сомнения заметите, что это очень похоже на код из листинга 8.4. Действительно, единственное, что изменилось, это аннотация слушателя - вместо @JmsListener используется @RabbitListener. Каким бы замечательным ни был @RabbitListener, это почти дублирование кода оставляет мне мало что сказать о @RabbitListener, чего я еще не говорил о @JmsListener. Они оба отлично подходят для написания кода, который отвечает на сообщения, отправляемые им соответствующими брокерами - брокером JMS для @JmsListener и брокером RabbitMQ для @RabbitListener.

Хотя вы можете чувствовать отсутствие энтузиазма в отношении @RabbitListener в предыдущем абзаце, будьте уверены, что я не специально. По правде говоря, тот факт, что @RabbitListener работает так же, как @JmsListener, на самом деле очень интересен! Это означает, что вам не нужно изучать совершенно другую модель программирования при работе с RabbitMQ против Artemis или ActiveMQ. То же самое справедливо для сходства между RabbitTemplate и JmsTemplate.

Завершая эту главу, рассмотрим еще один вариант обмена сообщениями, поддерживаемый Spring: Apache Kafka.

8.3 Обмен сообщениями с Kafka

Apache Kafka - это новейший вариант обмена сообщениями, который мы рассмотрим в этой главе. На первый взгляд, Kafka - это брокер сообщений, такой же, как ActiveMQ, Artemis или Rabbit. Но у Кафки есть несколько уникальных козырей в рукавах.

Kafka предназначена для работы в кластере, обеспечивая отличную масштабируемость. И разделяя свои темы по всем экземплярам в кластере добивается особой устойчивости. Принимая во внимание, что RabbitMQ имеет дело главным образом с очередями на слушателях, Kafka использует темы только для обмена сообщениями в pub/sub - сообщениях.

Темы Кафки распространяются на всех брокеров в кластере. Каждый узел в кластере выступает в качестве лидера для одной или нескольких тем, отвечая за данные этой темы и реплицируя их на другие узлы в кластере.

Далее, каждая тема может быть разделена на несколько разделов. В этом случае каждый узел в кластере является лидером для одного или нескольких разделов темы, но не для всей темы. Ответственность за тему распределена по всем узлам. Рисунок 8.2 иллюстрирует, как это работает.

Рис. 8.2. Кластер Kafka состоит из нескольких брокеров, каждый из которых выступает в роли лидера по разделам тем.

Kafka обладает уникальной архитектурой, и я призываю вас прочитать больше об этом в Kafka in Action Дилана Скотта (Manning, 2017). Для наших целей мы сосредоточимся на том, как отправлять сообщения и получать их от Kafka с помощью Spring.

8.3.1 Настройка Spring для обмена сообщениями Kafka

Чтобы начать использовать Kafka для обмена сообщениями, вам нужно добавить соответствующие зависимости в вашу сборку. Однако, в отличие от параметров JMS и RabbitMQ, для Kafka нет стартового Spring Boot. Но не бойтесь; вам понадобится только одна зависимость:

org.springframework.kafka

spring-kafka

Эта зависимость вносит все, что нужно для Кафки в проект. Более того, её присутствие инициирует автоконфигурацию Spring Boot для Kafka, которая, помимо прочего, организует KafkaTemplate в контексте приложения Spring. Все, что вам нужно сделать, это заинжектить KafkaTemplate и приступить к отправке и получению сообщений.

Однако прежде чем приступить к отправке и получению сообщений, вы должны знать о некоторых свойствах, которые пригодятся при работе с Kafka. В частности, KafkaTemplate по умолчанию работает с брокером Kafka на локальном хосте, прослушивая порт 9092. Это нормально, если брокер Kafka запускается локально при разработке приложения, но когда придет время перейти к продакшену, вам потребуется настроить другой хост и порт.

Свойство spring.kafka.bootstrap-servers задает расположение одного или нескольких серверов Kafka, используемых для установления начального соединения с кластером Kafka. Например, если один из серверов Kafka в кластере работает kafka.tacocloud.com и прослушивая порт 9092, вы можете настроить его местоположение в YAML следующим образом:

spring:

kafka:

bootstrap-servers:

- kafka.tacocloud.com:9092

Но обратите внимание spring.kafka.bootstrap-servers - является множественным числом и принимает список. Таким образом, вы можете предоставить ему несколько серверов Kafka в кластере:

spring:

kafka:

bootstrap-servers:

- kafka.tacocloud.com:9092

- kafka.tacocloud.com:9093

- kafka.tacocloud.com:9094

С настройкой Kafka в вашем проекте вы готовы отправлять и получать сообщения. Начнем с отправки объектов Order в Kafka с помощью KafkaTemplate.

8.3.2 Отправка сообщений с помощью KafkaTemplate

Во многих отношениях KafkaTemplate похож на своих аналогов в JMS и RabbitMQ. В то же время, это совсем другое. Это становится очевидным, когда мы рассмотрим методы отправки сообщений:

ListenableFuture> send(String topic, V data);

ListenableFuture> send(String topic, K key, V data);

ListenableFuture> send(String topic, Integer partition, K key, V data);

ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture> send(ProducerRecord record);

ListenableFuture> send(Message message);

ListenableFuture> sendDefault(V data);

ListenableFuture> sendDefault(K key, V data);

ListenableFuture> sendDefault(Integer partition, K key, V data);

ListenableFuture> sendDefault(Integer partition, Long timestamp, K key, V data);

Первое, что вы могли заметить, это то, что нет методов convertAndSend(). Это потому, что KafkaTemplate написана с помощью дженериков и может работать с типами доменов непосредственно при отправке сообщений. В некотором смысле, все методы send() выполняют функцию convertAndSend ().

Вы также могли заметить, что есть несколько параметров send() и sendDefaul (), которые сильно отличаются от тех, которые вы использовали с JMS и Rabbit. При отправке сообщений в Kafka вы можете указать следующие параметры, которые будут определять способ отправки сообщения:

-topic - тема для отправки сообщения (требуется для send())

-partition - раздел для записи topic (необязательно)

-key - ключ для отправки на запись (необязательно)

-timestamp - время (необязательно; по умолчанию используется System.currentTimeMillis())

-полезные данные (полезная нагрузка) (обязательно)

Тема и полезные данные являются двумя наиболее важными параметрами. Разделы и ключи мало влияют на то, как вы используете KafkaTemplate, кроме дополнительной информации, предоставляемой в качестве параметров для send() и sendDefault(). Для наших целей мы собираемся сосредоточиться на отправке полезной нагрузки сообщения в заданную тему и не беспокоиться о разделах и ключах.

Для метода send() вы также можете выбрать отправку ProducerRecord, который является немногим больше, чем тип, который содержит все предыдущие параметры в одном объекте. Вы также можете отправить объект Message, но для этого потребуется преобразовать ваши доменные объекты в Message. Как правило, проще использовать один из других методов, чем создавать и отправлять объект ProducerRecord или Message.

Используя KafkaTemplate и его метод send(), вы можете написать реализацию OrderMessagingService на основе Kafka. Следующий листинг показывает, как может выглядеть такая реализация.

Листинг 8.8. Отправка заказов с помощью KafkaTemplate

package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaOrderMessagingService implements OrderMessagingService {

private KafkaTemplate kafkaTemplate;

@Autowired

public KafkaOrderMessagingService(

KafkaTemplate kafkaTemplate) {

this.kafkaTemplate = kafkaTemplate;

}

@Override

public void sendOrder(Order order) {

kafkaTemplate.send("tacocloud.orders.topic", order);

}

}

В этой новой реализации OrderMessagingService метод sendOrder() использует метод send() внедренного KafkaTemplate для отправки Order в тему с именем tacocloud.orders.topic. За исключением слова «Кафка», разбросанного по коду, оно не сильно отличается от кода, который вы написали для JMS и Rabbit.

Если задать тему по умолчанию, можно немного упростить метод sendOrder(). Во-первых, установить тему по умолчанию в tacocloud.orders.topic, установив свойство spring.kafka.template.default-topic:

spring:

kafka:

template:

default-topic: tacocloud.orders.topic

Затем в методе sendOrder() вы можете вызвать sendDefault() вместо send() и не указывать имя темы:

@Override

public void sendOrder(Order order) {

kafkaTemplate.sendDefault(order);

}

Теперь, когда ваш код для отправки сообщений был написан, давайте обратим наше внимание на написание кода, который будет получать эти сообщения от Kafka.

8.3.3 Написание Kafka листинеров

Помимо уникальных сигнатур методов для send() и sendDefault(), KafkaTemplate отличается от JmsTemplate и RabbitTemplate тем, что не предлагает никаких методов для получения сообщений. Это означает, что единственный способ использовать сообщения из темы Kafka с помощью Spring - написать листинер сообщений.

Для Kafka листинеры сообщений определяются как методы, аннотированные @KafkaListener. Аннотация @KafkaListener примерно аналогична @JmsListener и @RabbitListener и используется практически одинаково. В следующем листинге показано, как может выглядеть получатель заказа на основе слушателя, если он написан для Kafka.

Листинг 8.9. Получение заказов с помощью @KafkaListener

package tacos.kitchen.messaging.kafka.listener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import tacos.Order;

import tacos.kitchen.KitchenUI;

@Component

public class OrderListener {

private KitchenUI ui;

@Autowired

public OrderListener(KitchenUI ui) {

this.ui = ui;

}

@KafkaListener(topics="tacocloud.orders.topic")

public void handle(Order order) {

ui.displayOrder(order);

}

}

Метод handle() имеет аннотацию @KafkaListener, чтобы указать, что его следует вызывать при поступлении сообщения в теме с именем tacocloud.orders.topic. Как написано в листинге 8.9, для handle() предоставляется только Order (полезная нагрузка). Но если вам нужны дополнительные метаданные из сообщения, он также может принять объект ConsumerRecord или Message.

Например, следующая реализация handle() принимает ConsumerRecord, чтобы вы могли регистрировать раздел и метку времени сообщения:

@KafkaListener(topics="tacocloud.orders.topic")

public void handle(Order order, ConsumerRecord record) {

log.info("Received from partition {} with timestamp {}",

record.partition(), record.timestamp());

ui.displayOrder(order);

}

Точно так же вы можете запросить Message вместо ConsumerRecord и достичь того же:

@KafkaListener(topics="tacocloud.orders.topic")

public void handle(Order order, Message message) {

MessageHeaders headers = message.getHeaders();

log.info("Received from partition {} with timestamp {}",

headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)

headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));

ui.displayOrder(order);

}

Стоит отметить, что полезная нагрузка сообщения также доступна через ConsumerRecord.value() или Message.getPayload(). Это означает, что вы можете запросить Order через эти объекты вместо того, чтобы запрашивать его напрямую как параметр handle().

ИТОГ

-Асинхронный обмен сообщениями обеспечивает уровень перенаправления между взаимодействующими приложениями, что обеспечивает более слабую связь и большую масштабируемость.

-Spring поддерживает асинхронный обмен сообщениями с JMS, RabbitMQ или Apache Kafka.

-Приложения могут использовать основанные на шаблонах клиенты (JmsTemplate, RabbitTemplate или KafkaTemplate) для отправки сообщений через брокер сообщений.

-Приложения могут использовать сообщения в модели на основе запросов, используя те же клиенты на основе шаблонов.

-Сообщения также можно передавать потребителям, применяя аннотации листенеров сообщений(@JmsListener, @RabbitListener или @KafkaListener) к методам bean.

Spring in Action Covers Spring 5.0 перевод на русский. Глава 9

9. Integrating Spring

Эта глава охватывает

-Обработка данных в режиме реального времени

-Определение интеграционных потоков

-Использование определения JAVA DSL Spring Integration

-Интеграция с электронной почтой, файловыми системами и другими внешними системами

Одна из самых неприятных вещей, с которыми я сталкиваюсь во время путешествия - это длительный перелет и плохое или несуществующее интернет-соединение в полете. Мне нравится использовать свое свободное время, чтобы выполнить некоторую работу, включая написание страниц этой книги. Если нет сетевого подключения, я в невыгодном положении, если мне нужно получить библиотеку или найти Java Doc, и я не могу выполнить большую часть работы. Я научился брать с собой книгу для чтения в таких случаям.

Точно так же, как нам необходимо подключиться к Интернету для продуктивной работы, многие приложения должны подключаться к внешним системам для выполнения своей работы. Приложению может потребоваться читать или отправлять электронные письма, взаимодействовать с внешним API или использовать данные, содержащиеся в базе данных. И поскольку данные поступают из этих внешних систем или записываются в эти внешние системы, приложению может потребоваться каким-то образом обрабатывать данные, чтобы перевести их в собственный домен приложения или из него.

В этой главе вы узнаете, как использовать общие шаблоны интеграции Spring Integration. Spring Integration - это готовая к использованию реализация многих шаблонов интеграции, которые каталогизированы в Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf (Addison-Wesley, 2003). Каждый шаблон реализован как компонент, через который сообщения передают данные в конвейер. Используя конфигурацию Spring, вы можете собрать эти компоненты в конвейер, по которому проходят потоки данных. Давайте начнем с определения простого потока интеграции, который познакомит вас со многие функциями и характеристиками работы с Spring Integration.

9.1 Объявление простого потока интеграции

Вообще говоря, Spring Integration позволяет создавать интеграционные потоки, через которые приложение может получать или отправлять данные на некоторый ресурс, внешний по отношению к самому приложению. Одним из таких ресурсов, с которым приложение может интегрироваться, является файловая система. Поэтому среди многих компонентов Spring Integration есть канальные адаптеры для чтения и записи файлов.

Чтобы получить удовольствие от Spring Integration, вы создадите поток интеграции, который записывает данные в файловую систему. Для начала вам нужно добавить Spring Integration в ваш проект. Для Maven необходимы следующие зависимости:

org.springframework.boot

spring-boot-starter-integration

org.springframework.integration

spring-integration-file

Первая зависимость - это стартер Spring Boot для Spring Integration. Эта зависимость важна для разработки потока Spring Integration, независимо от того, с чем он будет интегрироваться. Как и все starter зависимости Spring Boot, он доступен в виде флажка в форме Initializr.

Вторая зависимость - для файла Spring Integration endpoint модуля. Этот модуль является одним из более двух десятков endpoint модулей, используемых для интеграции с внешними системами. Мы поговорим подробнее о endpoint модулях в разделе 9.2.9. Но на данный момент известно, что endpoint модуль файлов предлагает возможность загружать файлы из файловой системы в поток интеграции и / или записывать данные из потока на файловую систему.

Затем необходимо создать способ для приложения, чтобы отправить данные в поток интеграции, так что он может быть записан в файл. Для этого вы создадите интерфейс шлюза, например, показанный ниже.

Листинг 9.1 Интерфейс шлюза сообщений для преобразования вызовов методов в сообщения

package sia5;

import org.springframework.integration.annotation.MessagingGateway;

import org.springframework.integration.file.FileHeaders;

import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel="textInChannel") //Объявляет шлюз сообщений

public interface FileWriterGateway {

void writeToFile(@Header(FileHeaders.FILENAME) String filename,String data); //Пишет в файл

}

Хотя это простой Java интерфейс, о FileWriterGateway можно многое сказать. Первое, что вы заметите, это то, что он аннотирован @MessagingGateway. Эта аннотация указывает Spring Integration на создание реализации этого интерфейса должно поисходить во время выполнения - подобно тому, как Spring Data автоматически генерирует реализации интерфейсов репозитория. Другие части кода будут использовать этот интерфейс, когда им нужно будет переписать файл.

Атрибут defaultRequestChannel @MessagingGateway указывает, что любые сообщения, полученные в результате вызова методов интерфейса, должны быть отправлены в данный канал сообщений. В этом случае вы заявляете, что любые сообщения, возникающие в результате вызова writeToFile(), должны отправляться на канал, имя которого textInChannel.

Что касается метода writeToFile(), он принимает имя файла в виде String и String, содержащий текст, который должен быть записан в файл. Что примечательно в сигнатуре этого метода, так это то, что параметр имени файла аннотирован @Header. В этом случае аннотация @Header указывает, что значение, переданное в filename, должно быть помещено в заголовок сообщения (указанный как FileHeaders.FILENAME, который разрешается в file_name), а не в полезную нагрузку сообщения. Значение параметра data, помещается в полезную нагрузку сообщения.

Теперь, когда у вас есть шлюз сообщений, вам нужно настроить интеграционный поток. Несмотря на то, что начальная зависимость Spring Integration, которую вы добавили в свою сборку, обеспечивает необходимую автоконфигурацию для Spring Integration, вы все равно должны написать дополнительные конфигурации, чтобы определить потоки, отвечающие потребностям приложения. Вот три варианта конфигурации для объявления потоков интеграции:

-XML конфигурация

-Java конфигурация

- Java конфигурация с DSL

Мы рассмотрим все три из этих стилей конфигурации для Spring Integration, начиная с устаревшей конфигурации XML.

9.1.1 Определение потоков интеграции через XML

Хотя я избегал использования конфигурации XML в этой книге, Spring Integration имеет долгую историю интеграционных потоков, определенных в XML. Поэтому я считаю, что мне стоит показать хотя бы один пример потока интеграции, определенного через XML. В следующем списке показано, как настроить пример потока через XML.

Листинг 9.2 Определение потоков интеграции через XML

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:int="http://www.springframework.org/schema/integration"

xmlns:int-file="http://www.springframework.org/schema/integration/file"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/integration

http://www.springframework.org/schema/integration/spring-integration.xsd

http://www.springframework.org/schema/integration/file

http://www.springframework.org/schema/integration/file/springintegration-file.xsd">

//Объявление textInChannel

input-channel="textInChannel"

output-channel="fileWriterChannel"

expression="payload.toUpperCase()" /> // Преобразование текста

//Объявление fileWriterChanne

channel="fileWriterChannel"

directory="/tmp/sia5/files"

mode="APPEND"

append-new-line="true" /> //Записывает текст в файл

Детализация XML в листинге 9.2:

Настаиваем канал с именем textInChannel. Это тот же канал что установлен для запроса FileWriterGateway. При вызове метода writeToFile() в FileWriterGateway, результирующее сообщение публикуется на этом канале.

Настроили преобразователь (transformer), который получает сообщения от textInChannel. Он использует выражение Spring Expression Language (SpEL) для вызова toUpperCase() в полезной нагрузке сообщения. Результат операции верхнего регистра затем публикуется в fileWriterChannel.

Настроили канал с именем fileWriterChannel. Этот канал служит проводником, который соединяет transformer с адаптером исходящего канала.

Наконец, настроили адаптер исходящего канала, используя пространство имен int-file. Это пространство имен XML предоставляется модулем Spring Integration для записи файлов. Он настроен так что получает сообщения от fileWriterChannel и записывает полезные данные сообщения в файл, имя которого указано в заголовке file_name сообщения в каталоге, указанном в атрибуте каталога. Если файл уже существует, в него будут добавлены новые данные, а не перезаписан.

Этот поток показан на рис. 9.1 с использованием графических элементов, стилизованных под Enterprise Integration Patterns.

Шлюз записи файлов - Текст в канале - Uppercase transformer - Канал записи файлов - Адаптер исходящего канала файла

Рисунок 9.1 Поток интеграции записи файлов

Если вы хотите использовать конфигурацию XML в приложении Spring Boot, вам необходимо импортировать XML как ресурс в приложение Spring. Самый простой способ сделать это - использовать аннотацию Spring @ImportResource в одном из классов конфигурации вашего приложения:

@Configuration

@ImportResource("classpath:/filewriter-config.xml")

public class FileWriterIntegrationConfig { ... }

Хотя конфигурация на основе XML хорошо послужила Spring Integration, большинство разработчиков опасаются использовать XML. (И, как я уже сказал, я избегаю конфигурации через XML в этой книге.) Давайте отложим эти угловые скобки и обратим наше внимание на стиль конфигурации через Java в Spring Integration.

9.1.2 Настройка потоков интеграции через Java

Большинство современных приложений Spring отказались от конфигурации XML в пользу конфигурации Java. Фактически в приложениях Spring Boot конфигурация Java является естественным стилем, дополняющим автоконфигурацию. Поэтому, если вы добавляете поток интеграции в приложение Spring Boot, имеет смысл определить поток посредством Java.

В качестве примера того, как написать поток интеграции используя Java конфигурацию, взгляните на следующий листинг. Это показывает тот же процесс интеграции записи файлов, что и раньше, но на этот раз он написан на Java.

package sia5;

import java.io.File; import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.annotation.Transformer;

import org.springframework.integration.file.FileWritingMessageHandler;

import org.springframework.integration.file.support.FileExistsMode;

import org.springframework.integration.transformer.GenericTransformer;

@Configuration

public class FileWriterIntegrationConfig {

@Bean

@Transformer(inputChannel="textInChannel", //Объявляем transformer

outputChannel="fileWriterChannel")

public GenericTransformer upperCaseTransformer() {

return text -> text.toUpperCase();

}

@Bean

@ServiceActivator(inputChannel="fileWriterChannel")

public FileWritingMessageHandler fileWriter() { //Объявляем запись в файл

FileWritingMessageHandler handler =

new FileWritingMessageHandler(new File("/tmp/sia5/files"));

handler.setExpectReply(false);

handler.setFileExistsMode(FileExistsMode.APPEND);

handler.setAppendNewLine(true);

return handler;

}

}

В Java конфигурации вы объявляете два bean-компонента: преобразователь (transformer) и обработчик сообщения для записи файла. Трансформатор является универсальным трансформатором. Поскольку GenericTransformer является функциональным интерфейсом, вы можете предоставить его реализацию в виде лямбды, которая вызывает toUpperCase() в тексте сообщения. Преобразователь bean аннотируется @Transformer, определяющим его как преобразователь в потоке интеграции, который принимает сообщения в канале с именем textInChannel и записывает сообщения в канал с именем fileWriterChannel.

Что касается bean для записи файлов, он помечается @ServiceActivator, чтобы указать, что он будет принимать сообщения от fileWriterChannel и передавать эти сообщения сервису, определенному экземпляром FileWritingMessageHandler. FileWritingMessageHandler - это обработчик сообщений, который записывает полезную нагрузку сообщения в файл в указанном каталоге, используя имя файла, указанное в заголовке file_name сообщения. Как и в примере с XML, FileWritingMessageHandler настроен на добавление в файл новой записи.

Уникальной особенностью конфигурации bean-компонента FileWritingMessageHandler является то, что существует вызов метода setExpectReply(false), который указывает, что активатору службы не следует ожидать ответный канал (канал, через который значение может быть возвращено вышестоящим компонентам в потоке). Если вы не вызываете setExpectReply(), для записи файла bean по умолчанию имеет значение true, и, хотя функция по-прежнему работает должным образом, вы увидите несколько ошибок, указывающих, что канал ответа не настроен.

Вы также заметите, что вам не нужно явно объявлять каналы. Каналы textInChannel и fileWriterChannel будут созданы автоматически, если бинов с этими именами не существует. Но если вам нужен больший контроль над конфигурацией каналов, вы можете явно создать их как bean-компоненты:

@Bean

public MessageChannel textInChannel() {

return new DirectChannel();

}

...

@Bean

public MessageChannel fileWriterChannel() {

return new DirectChannel();

}

Конфигурация посредством Java, возможно, проще для чтения и немного короче, и, безусловно, соответствует конфигурации “только Java”, о которой я рассказываю в этой книге. Но это можно сделать еще более упорядоченным с помощью стиля конфигурации Spring Integration Java DSL (предметно-ориентированного языка).

9.1.3 Использование конфигурации DSL Spring Integration

Давайте еще раз попробуем настроить поток интеграции записи файлов. На этот раз вы все равно настроите его через Java, но вы будете использовать Spring Integration Java DSL. Вместо объявления отдельного bean для каждого компонента в потоке, вы объявите один bean, который определяет весь поток.

Листинг 9.4. Предоставление свободного API для проектирования потоков интеграции

package sia5;

import java.io.File;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.dsl.IntegrationFlow;

import org.springframework.integration.dsl.IntegrationFlows;

import org.springframework.integration.dsl.channel.MessageChannels;

import org.springframework.integration.file.dsl.Files;

import org.springframework.integration.file.support.FileExistsMode;

@Configuration

public class FileWriterIntegrationConfig {

@Bean

public IntegrationFlow fileWriterFlow() {

return IntegrationFlows

.from(MessageChannels.direct("textInChannel")) //Входящий канал

.transform(t -> t.toUpperCase()) //Объявление transformer

.handle(Files //Обрабатывает запись в файл

.outboundAdapter(new File("/tmp/sia5/files"))

.fileExistsMode(FileExistsMode.APPEND)

.appendNewLine(true))

.get();

}

}

Эта новая конфигурация настолько кратка, насколько это возможно, захватывая весь поток одним bean

методом. Класс IntegrationFlows инициирует builder API, из которого можно объявить поток.

В листинге 9.4 вы начинаете с получения сообщений из канала с именем textInChannel, которые затем направляются в transformer, который преобразует в верхний регистр текст полезной нагрузки сообщения. После transformer сообщения обрабатываются адаптером исходящего канала, созданным из типа Files, предоставленного в файловом модуле Spring Integration. Наконец, вызов get() создает IntegrationFlow для возврата. Вкратце, этот одиночный bean метод создает тот же поток интеграции, что и примеры конфигурации XML и Java.

Заметьте, что, как и в примере Java конфигурации, вам не нужно явно объявлять bean-компоненты канала. Несмотря на то, что вы ссылаетесь на textInChannel, он автоматически создается Spring Integration, потому что не существует никакого bean-компонента канала с таким именем. Но вы можете явно объявить bean-компонент канала, если хотите.

Что касается канала, соединяющего преобразователь (transformer) с адаптером исходящего канала, вы даже не ссылаетесь на него по имени. Если необходимо явно настроить канал, вы можете ссылаться на него по имени в определении потока с помощью вызова channel():

@Bean

public IntegrationFlow fileWriterFlow() {

return IntegrationFlows

.from(MessageChannels.direct("textInChannel"))

.transform(t -> t.toUpperCase())

.channel(MessageChannels.direct("fileWriterChannel"))

.handle(Files

.outboundAdapter(new File("/tmp/sia5/files"))

.fileExistsMode(FileExistsMode.APPEND)

.appendNewLine(true))

.get();

}

При работе с Java DSL от Spring Integration (как и с любым свободно распространяемым API) нужно помнить, что для обеспечения удобочитаемости вы должны использовать пробелы. В приведенном здесь примере я старался делать отступы для обозначения блоков связанного кода. Для более длинных и сложных потоков вы можете даже рассмотреть выделение частей потока в отдельные методы или подпотоки для лучшей читаемости.

Теперь, когда вы увидели простой поток, определенный с использованием трех разных стилей конфигурации, давайте вернемся назад и взглянем на общую картину Spring Integration.

9.2. Обзор представления Spring Integration

Spring Integration охватывает множество сценариев интеграции. Попытка включить все это в одну главу - это все равно, что поместить слона в конверт. Вместо всестороннего рассмотрения Spring Integration, я представлю фотографию слона Spring Integration, чтобы дать вам некоторое представление о том, как он работает. Затем вы создадите еще один поток интеграции, который добавит функциональность в приложение Taco Cloud.

Поток интеграции состоит из одного или нескольких компонентов. Прежде чем писать больше кода, мы кратко рассмотрим роль каждого из этих компонентов в процессе интеграции:

-Channels (Каналы) —Передача сообщений от одного элемента к другому..

-Filters (Фильтры)—Позволяет сообщениям проходить через поток при соблюдении определенных условий.

-Transformers (Трансформаторы)—Меняют значение сообщения и/или преобразуют полезные нагрузки сообщений из одного типа в другой.

-Routers (Маршрутизаторы)—Направляет сообщения на один из нескольких каналов, обычно на основе заголовков сообщений

-Splitters (Разделить)—Разделяет входящие сообщения на два или более сообщений, каждое из которых отправляется на разные каналы.

-Aggregators (Агрегаторы)— Противоположность разделителям, объединяет несколько сообщений, поступающих из отдельных каналов, в одно сообщение.

-Service activators (Активаторы службы)—Передает сообщение какому-либо методу Java для обработки, а затем опубликует возвращаемое значение в выходном канале.

-Channel adapters (Канальные адаптеры)—Подключает канал к какой-либо внешней системе или транспорту. Может принимать входные данные или записывать данные во внешнюю систему.

-Gateways (Шлюзы)—Передает данные в интеграционный поток через интерфейс.

Вы уже видели некоторые из этих компонентов в действии, когда вы описали процесс интеграции записи файлов. Интерфейс FileWriterGateway был gateway, через который приложение отправляло текст для записи в файл. Вы также определили transformer для преобразования данного текста в верхний регистр; затем вы объявили сервисный gateway, который выполнял задачу записи текста в файл. И у потока было два канала, textInChannel и fileWriterChannel, которые связывали другие компоненты друг с другом. Теперь краткий обзор компонентов потока интеграции, как и было обещано.

9.2.1 Каналы сообщений

Каналы сообщений - это средства, с помощью которых сообщения проходят через интеграционный конвейер (рисунок 9.2). Это трубы, которые соединяют между собой все остальные части Spring Integration.

Рисунок 9.2 Каналы сообщений - это каналы, по которым потоки данных проходят между другими компонентами в потоке интеграции.

Spring Integration предоставляет несколько реализаций канала, включая следующие:

-PublishSubscribeChannel—Сообщения, опубликованные в PublishSubscribeChannel, передаются одному или нескольким потребителям. Если есть несколько потребителей, все они получают сообщение.

-QueueChannel—Сообщения, опубликованные в QueueChannel, сохраняются в очереди до тех пор, пока пользователь не извлечет их в порядке поступления (FIFO). Если есть несколько потребителей, только один из них получает сообщение.

-PriorityChannel—Как и QueueChannel, но вместо поведения FIFO сообщения извлекаются потребителями на основе заголовка приоритета сообщения.

-RendezvousChannel—Как и QueueChannel, за исключением того, что отправитель блокирует канал до тех пор, пока потребитель не получит сообщение, эффективно синхронизируя отправителя с потребителем.

-DirectChannel - Подобно PublishSubscribeChannel, но отправляет сообщение одному потребителю, вызывая потребителя в том же потоке, что и отправитель. Это позволяет транзакциям распространяться по каналу.

-ExecutorChannel - аналогичен DirectChannel, но отправка сообщения происходит через TaskExecutor, происходящий в отдельном потоке от отправителя. Этот тип канала не поддерживает транзакции, которые охватывают канал.

-FluxMessageChannel - канал сообщений Reactive Streams Publisher, основанный на Flux в Project Reactor. (Мы поговорим подробнее о Reactive Streams, Reactor и Flux в главе 10.)

Как в конфигурации Java, так и в стилях Java DSL входные каналы создаются автоматически с DirectChannel по умолчанию. Но если вы хотите использовать другую реализацию канала, вам необходимо явно объявить канал как bean-компонент и сослаться на него в потоке интеграции. Например, чтобы объявить PublishSubscribeChannel, вы должны объявить следующий метод @Bean:

@Bean

public MessageChannel orderChannel() {

return new PublishSubscribeChannel();

}

Тогда вы будете ссылаться на этот канал по имени в определении потока интеграции. Например, если канал использовался bean-компонентом-активатором службы, вы бы ссылались на него в атрибуте inputChannel @ServiceActivator:

@ServiceActivator(inputChannel="orderChannel")

Или, если вы используете стиль конфигурации JAVA DSL, вы должны ссылаться на него с помощью вызова channel():

@Beanpublic

IntegrationFlow orderFlow() {

return IntegrationFlows

.channel("orderChannel")

.get();

}

Важно отметить, что если вы используете QueueChannel, потребители должны быть настроены с помощью средства опроса. Например, предположим, что вы объявили QueueChannel bean следующим образом:

@Bean

public MessageChannel orderChannel() {

return new QueueChannel();

}

Вам нужно будет убедиться, что потребитель настроен на опрос канала для сообщений. В случае службы активатора аннотация @ServiceActivator может выглядеть следующим образом:

@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))

В этом примере служба активатора опрашивает канал с именем orderChannelevery каждую 1 секунду (или 1000 мс).

9.2.2 Фильтры

Фильтры могут быть размещены в середине конвейера интеграции, чтобы разрешить или запретить переход сообщений к следующему шагу в потоке (рис.9.3).

Рисунок 9.3 Фильтры, основанные на некоторых критериях, разрешают или запрещают передачу сообщений в конвейере.

Например, предположим, что сообщения, содержащие целочисленные значения, публикуются через канал с именем numberChannel, но вы хотите, чтобы на канал с именем evenNumberChannel передавались только четные числа. В этом случае вы можете объявить фильтр с аннотацией @Filter следующим образом:

@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel")

public boolean evenNumberFilter(Integer number) {

return number % 2 == 0;

}

Кроме того, если вы используете стиль конфигурации JAVA DSL для определения потока интеграции, вы можете сделать вызов filter() следующим образом:

@Bean

public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {

return IntegrationFlows

.filter((p) -> p % 2 == 0)

.get();

}

В этом случае для реализации фильтра используется лямбда. Но, по правде говоря, метод filter() принимает GenericSelector в качестве аргумента. Это означает, что вы можете реализовать интерфейс GenericSelector вместо этого, если ваша фильтрация слишком сложна для простой лямбды.

9.2.3 Трансформаторы

Трансформаторы выполняют некоторые операции над сообщениями, обычно приводя к другому сообщению и, возможно, с другим типом полезной нагрузки (см. рис.9.4). Преобразование может быть чем-то простым, например, выполнение математических операций над числом или манипулирование строковым значением. Или преобразование может быть более сложным,например, использование строкового значения, представляющего ISBN, для поиска и возврата сведений о соответствующей книге.

Рисунок 9.4 Трансформаторы преобразуют сообщения по мере их прохождения через поток интеграции.

Например, предположим, что целочисленные значения публикуются на канале с именем numberChannel, и вы хотите преобразовать эти числа в String, содержащую Римский числовой эквивалент. В этом случае вы можете объявить bean GenericTransformer и аннотировать его с помощью @Transformer следующим образом:

@Bean

@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")

public GenericTransformer romanNumTransformer() {

return RomanNumbers::toRoman;

}

Аннотация @Transformer определяет этот компонент как компонент transformer, который получает целочисленные значения из канала с именем numberChannel и использует статический метод с именем toRoman() для выполнения преобразования. (Метод toRoman () статически определен в классе с именем RomanNumbers и ссылается здесь со ссылкой на метод) Результат будет опубликован на канале с именем romanNumberChannel.

В стиле конфигурации Java DSL еще проще с вызовом transform(), передавая ссылку на метод toRoman():

@Bean

public IntegrationFlow transformerFlow() {

return IntegrationFlows

.transform(RomanNumbers::toRoman)

.get();

}

Хотя вы использовали ссылку на метод в обоих примерах кода преобразователя, знайте, что преобразователь также можно указать как лямбду. Или, если преобразователь достаточно сложен, чтобы потребовался отдельный класс Java, вы можете внедрить его как bean-компонент в конфигурацию потока и передать ссылку на метод transform():

@Bean

public RomanNumberTransformer romanNumberTransformer() {

return new RomanNumberTransformer();

}

@Bean

public IntegrationFlow transformerFlow( RomanNumberTransformer romanNumberTransformer) {

return IntegrationFlows

.transform(romanNumberTransformer)

.get();

}

Здесь вы объявляете bean-компонент типа RomanNumberTransformer, который сам является реализацией Spring Integration Transformer или интерфейса GenericTransformer. Bean внедряется в метод transformerFlow() и передается в метод transform() при определении потока интеграции.

9.2.4 Маршрутизаторы

Маршрутизаторы, основанные на некоторых критериях маршрутизации, позволяют разветвляться в потоке интеграции,направляя сообщения в разные каналы (см. рисунок 9.5).

Рис. 9.5 Маршрутизаторы направляют сообщения в различные каналы на основе некоторых критериев, применяемых к сообщениям.

Например, предположим, что у вас есть канал с именем numberChannel, через который проходят целочисленные значения. Допустим, вы хотите направить все сообщения с четными номерами в канал с именем evenChannel, а сообщения с нечетными номерами направляются в канал с именем oddChannel. Чтобы создать такую маршрутизацию в потоке интеграции, вы можете объявить bean-компонент типа AbstractMessageRouter и аннотировать bean-компонент с помощью @Router:

@Bean

@Router(inputChannel="numberChannel")

public AbstractMessageRouter evenOddRouter() {

return new AbstractMessageRouter() {

@Override

protected Collection

determineTargetChannels(Message message) {

Integer number = (Integer) message.getPayload();

if (number % 2 == 0) {

return Collections.singleton(evenChannel());

}

return Collections.singleton(oddChannel());

}

};

}

@Bean

public MessageChannel evenChannel() {

return new DirectChannel();

}

@Bean

public MessageChannel oddChannel() {

return new DirectChannel();

}

Объявленный здесь bean AbstractMessageRouter принимает сообщения от входного канала с именем numberChannel. Реализация, определенная как анонимный внутренний класс, проверяет полезную нагрузку сообщения и, если это четное число, возвращает канал с именем evenChannel (объявленный как bean). В противном случае число в полезной нагрузке канала должно быть нечетным; в этом случае возвращается канал с именем oddChannel (также созданный как bean).

В Java DSL маршрутизаторы объявляются путем вызова route() в ходе определения потока, как показано ниже:

@Bean

public IntegrationFlow numberRoutingFlow(AtomicInteger source) {

return IntegrationFlows

.route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping

.subFlowMapping("EVEN", sf -> sf .transform(n -> n * 10)

.handle((i,h) -> { ... })

)

.subFlowMapping("ODD", sf -> sf .transform(RomanNumbers::toRoman) .handle((i,h) -> { ... })

)

)

.get();

}

Хотя по-прежнему можно объявить AbstractMessageRouter и передать его в router(), в этом примере используется лямбда для определения того, является ли полезная нагрузка сообщения четной или нечетной. Если она четная, то возвращается строковое значение EVEN. Если нечетная, то возвращается ODD.Эти значения затем используются для определения того, какое под-сопоставление будет обрабатывать сообщение.

9.2.5 Разделители

Иногда в потоке интеграции может быть полезно разделить сообщение на несколько сообщений, которые будут обрабатываться независимо. Разделители, как показано на рис.9.6, будут разделять и обрабатывать эти сообщения для вас.

Рис. 9.6 Hазделители разбивают сообщения на два или более отдельных сообщения, которые могут обрабатываться отдельными подпотоками.

Splitter-ы полезны во многих случаях, но есть два основных варианта использования, для которых вы можете использовать сплиттер:

-Полезная нагрузка сообщения содержит коллекцию элементов того же типа, которые вы хотите обработать как отдельные полезные нагрузки сообщений. Например, сообщение, содержащее список продуктов, может быть разделено на несколько сообщений с полезной нагрузкой по одному продукту в каждом

-Полезная нагрузка сообщения содержит информацию, которая, хотя и связана, может быть разделена на два или более сообщений разных типов. Например, заказ на покупку может содержать информацию о поставках, выставлении счетов и номенклатуре. Сведения о доставке могут обрабатываться одним подпотоком, выставление счетов-другим, а номенклатура - еще одним. В этом случае за разделителем обычно следует маршрутизатор, который направляет сообщения по типу полезной нагрузки, чтобы гарантировать, что данные обрабатываются правильным подпотоком.

При разделении полезной нагрузки сообщения на два или более сообщений разных типов обычно достаточно определить POJO, который извлекает отдельные части входящей полезной нагрузки и возвращает их как элементы коллекции.

Например, предположим, что вы хотите разделить сообщение, содержащее заказ на покупку, на два сообщения: одно содержит платежную информацию, а другое-список элементов списка. Следующий OrderSplitter сделает работу:

public class OrderSplitter {

public Collection splitOrderIntoParts(PurchaseOrder po)

ArrayList parts = new ArrayList<>();

parts.add(po.getBillingInfo());

parts.add(po.getLineItems());

return parts;

}

}

Затем вы можете объявить bean OrderSplitter как часть потока интеграции, аннотируя его с помощью @Splitter следующим образом:

@Bean

@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")

public OrderSplitter orderSplitter() {

return new OrderSplitter();

}

Здесь заказы на поставку поступают на канал с именем poChannel и делятся на OrderSplitter. Затем каждый элемент в возвращенной коллекции публикуется как отдельное сообщение в потоке интеграции в канал с именем splitOrderChannel. На этом этапе потока вы можете объявить PayloadTypeRouter для маршрутизации платежной информации и позиций в их собственный подпоток:

@Bean

@Router(inputChannel="splitOrderChannel")

public MessageRouter splitOrderRouter() {

PayloadTypeRouter router = new PayloadTypeRouter();

router.setChannelMapping(

BillingInfo.class.getName(), "billingInfoChannel");

router.setChannelMapping(

List.class.getName(), "lineItemsChannel");

return router;

}

Как следует из его названия, PayloadTypeRouter направляет сообщения на разные каналы на основе их типа полезной нагрузки. Здесь сконфигурировано так что, сообщения, чья полезная нагрузка имеет тип BillingInfo, направляются в канал с именем billingInfoChannel для дальнейшей обработки. Что касается позиций, они находятся в коллекции java.util.List; следовательно, вы сопоставили полезные нагрузки типа List для направления на канал с именем lineItemsChannel.

В настоящее время поток разделяется на два подпотока: один, через который проходят объекты BillingInfo, и другой, через который проходит List. Но что, если вы хотите разбить его дальше так, чтобы вместо работы со списком LineItems вы обрабатывали каждый LineItem отдельно? Все, что вам нужно сделать, чтобы разделить список элементов строки на несколько сообщений, по одному для каждой позиции, - это написать метод (не bean-компонент), который аннотируется с помощью @Splitter и возвращает коллекцию LineItems, возможно, что-то вроде этого:

@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")

public List lineItemSplitter(List lineItems) {

return lineItems;

}

Когда сообщение, несущее полезную нагрузку List, поступает в канал с именем lineItemsChannel, оно передается методу lineItemSplitter(). Согласно правилам разделителя, метод должен возвращать коллекцию элементов, подлежащих разделению. В этом случае у вас уже есть коллекция LineItems, поэтому вы просто возвращаете коллекцию напрямую. В результате каждый LineItem в коллекции публикуется в своем собственном сообщении для канала с именем lineItemChannel.

Если вы предпочитаете использовать Java DSL для объявления конфигурации сплиттера/маршрутизатора, вы можете сделать это с помощью вызовов функций split() и route():

return IntegrationFlows

.split(orderSplitter())

. route(

p -> {

if (p.getClass().isAssignableFrom(BillingInfo.class)) {

return "BILLING_INFO";

} else {

return "LINE_ITEMS";

}

}, mapping -> mapping

.subFlowMapping("BILLING_INFO", sf -> sf

. handle((billingInfo, h) -> {

}))

.subFlowMapping("LINE_ITEMS", sf -> sf

.split()

. handle((lineItem, h) -> {

}))

)

.get();

Форма определения потока в DSL определенно более краткая, но и более трудная для понимания. Для разделения порядка используется тот же OrderSplitter, что и в примере конфигурации Java. После разделения заказ разбивается по типу на два отдельных подпотока.

9.2.6 Активаторы службы

Активаторы службы получают сообщения из входного канала и отправляют эти сообщения в реализацию MessageHandler, как показано на рисунке 9.7.

Рис. 9.7. Активаторы службы вызывают некоторую службу посредством MessageHandler при получении сообщения.

Spring Integration предлагает несколько реализаций MessageHandler «из коробки» (даже PayloadTypeRouter - это реализация MessageHandler), но вам часто потребуется предоставить некоторую пользовательскую реализацию, которая будет действовать как активатор службы. Например, в следующем коде показано, как объявить bean MessageHandler, настроенный как активатор службы:

@Bean

@ServiceActivator(inputChannel="someChannel")

public MessageHandler sysoutHandler() {

return message -> {

System.out.println("Message payload: " + message.getPayload());

};

}

Bean аннотируется @ServiceActivator, чтобы обозначить его как активатор службы, который обрабатывает сообщения из канала с именем someChannel. Что касается MessageHandler, он реализован через лямбду. Несмотря на то, что это простой MessageHandler, при получении сообщения он передает свою полезную нагрузку в стандартный поток вывода.

Кроме того, можно объявить активатор службы, который обрабатывает данные во входящем сообщении перед возвратом новой полезной нагрузки. В этом случае компонент должен быть GenericHandler, а не MessageHandler:

@Bean

@ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder")

public GenericHandler orderHandler( OrderRepository orderRepo) {

return (payload, headers) -> {

return orderRepo.save(payload);

};

}

В этом случае активатор службы является GenericHandler, который ожидает сообщения с полезной нагрузкой типа Order. Когда заказ поступает, он сохраняется через репозиторий; полученный сохраненный заказ возвращается для отправки в выходной канал, имя которого completeChannel.

Вы могли заметить, что GenericHandler предоставляет не только полезную нагрузку, но и заголовки сообщений (даже если пример не использует эти заголовки в любом случае). Вы также можете использовать активаторы служб в стиле конфигурации JAVA DSL, передавая MessageHandler или GenericHandler для метода handle () в определении потока:

public IntegrationFlow someFlow() {

return IntegrationFlows

.handle(msg -> {

System.out.println("Message payload: " + msg.getPayload());

})

.get();

}

В этом случае MessageHandler задается как лямбда, но вы также можете предоставить его как ссылку на метод или даже как экземпляр класса, который реализует интерфейс MessageHandler. Если вы даете ему ссылку на лямбду или метод, имейте в виду, что он принимает сообщение в качестве параметра.

Аналогично, handle() может быть написан для принятия GenericHandler, если активатор службы не предназначен для завершения потока. Применяя ранее описанный активатор службы сохранения заказов, вы можете настроить поток с помощью Java DSL следующим образом:

public IntegrationFlow orderFlow(OrderRepository orderRepo) {

return IntegrationFlows

.handle((payload, headers) -> {

return orderRepo.save(payload);

})

.get();

}

При работе с GenericHandler, ссылка на лямбду или метод принимает полезную нагрузку и заголовки сообщений в качестве параметров. Кроме того, если вы решите использовать GenericHandler в конце потока, вам нужно будет вернуть null, иначе вы получите ошибки, указывающие, что нет указанного выходного канала.

9.2.7 Шлюзы

Шлюзы-это средства, с помощью которых приложение может передавать данные в поток интеграции и, при необходимости, получать ответ, который является результатом потока. Реализованные с помощью Spring Integration шлюзы реализуются как интерфейсы, которые приложение может вызывать для отправки сообщений в поток интеграции (рис.9.8).

Рисунок 9.8 Сервисные шлюзы - это интерфейсы, через которые приложение может отправлять сообщения в интеграционный поток.

Вы уже видели пример шлюза сообщений со шлюзом FileWriterGateway. FileWriterGateway был односторонним шлюзом с методом, принимающим String для записи в файл, возвращая void. Примерно так же легко написать двусторонний шлюз. При написании интерфейса шлюза убедитесь, что метод возвращает некоторое значение для публикации в потоке интеграции.

В качестве примера представьте себе шлюз, который находится в простом интеграционном потоке, который принимает String и переводит данную строку в верхний регистр. Интерфейс шлюза может выглядеть примерно так:

package com.example.demo;

import org.springframework.integration.annotation.MessagingGateway;

import org.springframework.stereotype.Component;

@Component

@MessagingGateway(defaultRequestChannel="inChannel",

defaultReplyChannel="outChannel")

public interface UpperCaseGateway {

String uppercase(String in);

}

Что удивительно в этом интерфейсе, так это то, что его не нужно реализовывать. Spring Integration автоматически предоставляет реализацию во время выполнения, которая отправляет и получает данные по указанным каналам.

При вызове метода uppercase() данная String публикуется в потоке интеграции в канал с именем inChannel. И, независимо от того, как определяется поток или что он делает, когда данные поступают в канал с именем outChannel, он возвращается из метода верхнего uppercase().

Что касается потока интеграции в верхнем регистре, то это упрощенный процесс интеграции с единственным шагом преобразования String в верхний регистр. Опишем в конфигурации Java DSL:

@Bean

public IntegrationFlow uppercaseFlow() {

return IntegrationFlows

.from("inChannel")

. transform(s -> s.toUpperCase())

.channel("outChannel")

.get();

}

Здесь описано, поток начинается с данных, поступающих в канал с именем inChannel. Затем полезная нагрузка сообщения преобразуется преобразователем, который здесь определен как лямбда-выражение, для выполнения операции преобразования в верхний регистр. Полученное сообщение затем публикуется в канал с именем outChannel, который вы объявили в качестве канала ответа для интерфейса UpperCaseGateway.

9.2.8 Канальные адаптеры

Канальные адаптеры представляют точки входа и выхода потока интеграции. Данные входят в поток интеграции через адаптер входящего канала и выходят из потока интеграции через адаптер исходящего канала. Это показано на рисунке 9.9.

Рис. 9.9. Адаптеры канала - это точки входа и выхода потока интеграции.

Адаптеры входящего канала могут принимать различные формы в зависимости от источника данных, вводимых в поток. Например, вы можете объявить адаптер входящего канала, который вводит инкрементные числа из AtomicInteger в поток. При использовании Java конфигурации это может выглядеть так:

@Bean

@InboundChannelAdapter(

poller=@Poller(fixedRate="1000"), channel="numberChannel")

public MessageSource numberSource(AtomicInteger source) {

return () -> {

return new GenericMessage<>(source.getAndIncrement());

};

}

Этот @Bean метод объявляет компонент адаптера входящего канала, который в соответствии с аннотацией @InboundChannelAdapter отправляет число из введенного AtomicInteger в канал с именем numberChannel каждые 1 секунду (или 1000 мс).

Принимая во внимание, что @InboundChannelAdapter указывает адаптер входящего канала при использовании конфигурации Java, метод from() - это то, как это делается при использовании Java DSL для определения потока интеграции. В следующем фрагменте определения потока показан аналогичный адаптер входящего канала, определенный в Java DSL:

@Bean

public IntegrationFlow someFlow(AtomicInteger integerSource) {

return IntegrationFlows

.from(integerSource, "getAndIncrement", c -> c.poller(Pollers.fixedRate(1000)))

.get();

}

Часто адаптеры каналов предоставляются одним из множества модулей конечной точки Spring Integration. Предположим, например, что вам нужен адаптер входящего канала, который отслеживает указанный каталог и отправляет любые файлы, которые записываются в этот каталог, как сообщения в канал с именем file-channel. Следующая конфигурация Java использует FileReadingMessageSource из модуля Spring Integration’s file endpoint для достижения этой цели:

@Bean

@InboundChannelAdapter(channel="file-channel", poller=@Poller(fixedDelay="1000"))

public MessageSource fileReadingMessageSource() {

FileReadingMessageSource sourceReader = new FileReadingMessageSource();

sourceReader.setDirectory(new File(INPUT_DIR));

sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));

return sourceReader;

}

При записи эквивалентного адаптера входящего канала для чтения файлов в Java DSL метод inboundAdapter() из класса Files выполняет то же самое. Адаптер внешнего канала - это конец строки для процесса интеграции, передающий окончательное сообщение приложению или какой-либо другой системе:

@Bean

public IntegrationFlow fileReaderFlow() {

return IntegrationFlows

.from(Files.inboundAdapter(new File(INPUT_DIR))

.patternFilter(FILE_PATTERN))

.get();

}

Активаторы сервисов, реализованные как обработчики сообщений, часто служат для адаптера исходящего канала, особенно когда данные должны быть переданы самому приложению. Мы уже обсуждали сервисы активаторы, поэтому нет смысла повторять это обсуждение.

Однако стоит отметить, что модули Spring Integration endpoint предоставляют полезные обработчики сообщений для нескольких распространенных случаев использования. Вы видели пример такого адаптера исходящего канала, FileWritingMessageHandler, в листинге 9.3. Говоря о модулях Spring Integration endpoint, давайте кратко рассмотрим, какие готовые к использованию модули конечных точек интеграции доступны.

9.2.9 Модули конечных точек (endpoint)

Замечательно, что Spring Integration позволяет вам создавать свои собственные адаптеры канала. Но еще лучше то, что Spring Integration предоставляет более двух десятков модулей конечных точек, содержащих адаптеры каналов - как входящие, так и исходящие - для интеграции с различными общими внешними системами, в том числе перечисленными в таблице 9.1.

Таблица 9.1. Spring Integration предоставляет более двух десятков моделей конечных точек для интеграции с внешними системами.

Модуль : Идентификатор артефакта зависимости (идентификатор группы: org.springframework.integration)

AMQP: spring-integration-amqp

Spring application events : spring-integration-event

RSS and Atom : spring-integration-feed

Filesystem : spring-integration-file

FTP/FTPS : spring-integration-ftp

GemFire : spring-integration-gemfire

HTTP : spring-integration-http

JDBC : spring-integration-jdbc

JPA : spring-integration-jpa

JMS : spring-integration-jms

Email : spring-integration-mail

MongoDB : spring-integration-mongodb

MQTT : spring-integration-mqtt

Redis : spring-integration-redis

RMI : spring-integration-rmi

SFTP : spring-integration-sftp

STOMP : spring-integration-stomp

Stream : spring-integration-stream

Syslog : spring-integration-syslog

TCP/UDP : spring-integration-ip

Twitter : spring-integration-twitter

Web Services : spring-integration-ws

WebFlux : spring-integration-webflux

WebSocket : spring-integration-websocket

XMPP : spring-integration-xmpp

ZooKeeper : spring-integration-zookeeper

Из таблицы 9.1 ясно, что Spring Integration предоставляет обширный набор компонентов для удовлетворения многих потребностей интеграции. Большинству приложений никогда не потребуется даже часть того, что предлагает Spring Integration. Но хорошо знать, что Spring Integration может, на случай если вам вдруг что-то из этого перечня понадобится.

Более того, было бы невозможно охватить все адаптеры каналов, предоставляемые модулями, перечисленными в таблице 9.1, в этой главе. Вы уже видели примеры, которые используют модуль файловой системы для записи в файловую систему. И вы скоро будете использовать модуль электронной почты для чтения электронных писем.

Каждый из модулей конечных точек предлагает канальные адаптеры, которые могут быть либо объявлены как bean-ы при использовании Java конфигурации, либо ссылаться на статические методы при использовании Java DSL конфигурации. Я рекомендую вам изучить любые другие endpoint модули, которые вас интересуют больше всего. Вы обнаружите, что они довольно последовательны в том, как они используются. Но сейчас давайте обратим наше внимание на endpoint модуль электронной почты, чтобы узнать, как вы можете использовать его в приложении Taco Cloud.

9.3 Создание интеграционного потока электронной почты

Вы решили, что Taco Cloud должна позволить своим клиентам отправлять свои тако-дизайны и размещать заказы по электронной почте. Вы рассылаете листовки и размещаете рекламу в газетах, предлагая всем отправлять свои заказы тако по электронной почте. Это огромный успех! К сожалению, это слишком успешно. На адрес электронной почты поступает так много писем, что вам нужно нанять отдельного человека, чтобы читать все письма и отправлять детали заказа в систему заказов.

В этом разделе вы реализуете интеграционный поток, который опрашивает почтовый ящик Taco Cloud на наличие электронных писем с заказами, анализирует электронные письма на предмет деталей заказа и отправляет заказы в Taco Cloud для обработки. Короче говоря, поток интеграции, который вам понадобится, будет использовать адаптер входящего канала из модуля конечной точки электронной почты для загрузки электронной почты из папки входящих сообщений Taco Cloud в поток интеграции.

Следующим шагом в процессе интеграции будет разбор электронных писем в объекты заказов, которые передаются другому обработчику для отправки заказов в REST API Taco Cloud, где они будут обрабатываться так же, как и любой заказ. Для начала давайте определим простой класс свойств конфигурации, чтобы отразить особенности обработки электронной почты Taco Cloud:

@Data

@ConfigurationProperties(prefix="tacocloud.email")

@Component

public class EmailProperties {

private String username;

private String password;

private String host;

private String mailbox;

private long pollRate = 30000;

public String getImapUrl() {

return String.format("imaps://%s:%s@%s/%s",

this.username, this.password, this.host, this.mailbox);

}

}

Как вы можете видеть, EmailProperties содержит свойства, которые используются для создания URL-адреса IMAP. Этот поток использует этот URL-адрес для подключения к серверу электронной почты Taco Cloud и опроса электронных писем. К числу свойств относятся имя пользователя и пароль пользователя электронной почты, а также имя хоста сервера IMAP, почтовый ящик для опроса и скорость, с которой опрашивается почтовый ящик (по умолчанию каждые 30 секунд).

Класс EmailProperties аннотируется на уровне класса @ConfigurationProperties с атрибутом prefix, установленным в tacocloud.email. Это означает, что вы можете настроить детали использования электронной почты в файле application.yml следующим образом:

tacocloud:

email:

host: imap.tacocloud.com

mailbox: INBOX

username: taco-in-flow

password: 1L0v3T4c0s

poll-rate: 10000

Теперь давайте воспользуемся EmailProperties для настройки потока интеграции. Поток, который вы создадите, будет немного похож на рисунок 9.10.

1)Email (IMAP)адаптер входящего канала

3)Mail-to-ordertransformer

5)Отправить заказ на адаптер исходящего канала

Рисунок 9.10 Поток интеграции для приема заказов тако по электронной почте

У вас есть два варианта определения этого потока:

-Определить внутри приложения Taco Cloud - в конце потока сервис-активатор вызовет репозитории, которые вы определили, чтобы создать заказ тако.

-Определите как отдельное приложение.- В конце потока активатор службы отправит запрос POST в Taco Cloud API для отправки заказа тако.

Что вы выберете, мало влияет на сам поток, помимо того, как реализован сервисный активатор. Но поскольку вам потребуются некоторые типы, которые представляют собой тако, заказы и ингредиенты, которые несколько отличаются от тех, которые вы уже определили в основном приложении Taco Cloud, продолжите определение определять интеграционный поток в отдельном приложении, чтобы избежать путаница с существующими типами доменов.

У вас также есть выбор определения потока с использованием XML конфигурации, Java конфигурации или Java DSL. Мне нравится элегантность Java DSL, так что и вы вслед за мной будете ее использовать. Не стесняйтесь писать поток, используя один из других стилей конфигурации, если вам требуются какие-то дополнительные возможности. А пока давайте взглянем на Java DSL конфигурацию для потока электронной почты заказа тако, как показано ниже.

Листинг 9.5. Определение потока интеграции для приема электронных писем и отправки их в качестве заказов

package tacos.email;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.dsl.IntegrationFlow;

import org.springframework.integration.dsl.IntegrationFlows;

import org.springframework.integration.dsl.Pollers;

@Configuration

public class TacoOrderEmailIntegrationConfig {

@Bean

public IntegrationFlow tacoOrderEmailFlow(

EmailProperties emailProps,

EmailToOrderTransformer emailToOrderTransformer,

OrderSubmitMessageHandler orderSubmitHandler) {

return IntegrationFlows

.from(Mail.imapInboundAdapter(emailProps.getImapUrl()),

e -> e.poller(

Pollers.fixedDelay(emailProps.getPollRate())))

.transform(emailToOrderTransformer)

.handle(orderSubmitHandler)

.get();

}

}

Поток электронной почты заказа тако, определенный в методе tacoOrderEmailFlow(), состоит из трех отдельных компонентов:

-Адаптер входящего канала электронной почты IMAP - этот адаптер канала создается с помощью URL-адреса IMP, созданного методом getImapUrl() объекта EmailProperties, и опрашивает с задержкой, установленной в свойстве pollRate объекта EmailProperties. Входящие письма передаются по каналу, соединяющему его с трансформатором.

-Трансформатор, который преобразует электронную почту в объект заказа - Преобразователь реализуется в EmailToOrderTransformer, который внедряется в метод tacoOrderEmailFlow(). Заказы, полученные в результате преобразования, передаются конечному компоненту через другой канал.

-Обработчик (выступающий в качестве адаптера исходящего канала)- Обработчик принимает объект заказа и отправляет его по REST API Taco Cloud.

Вызов Mail.imapInboundAdapter() стал возможен благодаря включению endpoint модуля электронной почты в качестве зависимости в сборку проекта. Зависимость Maven выглядит так:

org.springframework.integration

spring-integration-file

Класс EmailToOrderTransformer является реализацией интерфейса Spring Integration Transformer посредством расширения AbstractMailMessageTransformer (показано в следующем листинге).

Листинг 9.6. Преобразование входящих писем в тако-заказы с использованием интеграционного преобразователя

@Component

public class EmailToOrderTransformer

extends AbstractMailMessageTransformer {

@Override

protected AbstractIntegrationMessageBuilder doTransform(Message mailMessage)

throws Exception {

Order tacoOrder = processPayload(mailMessage);

return MessageBuilder.withPayload(tacoOrder);

}

}

AbstractMailMessageTransformer - удобный базовый класс для обработки сообщений, чья полезная нагрузка - это электронная почта. Он заботится о извлечении информации электронной почты из входящего сообщения в объект Message, который передается в метод doTransform().

В методе doTransform() вы передаете Message private методу с именем processPayload() для анализа электронной почты на получения объекта Order. Несмотря на то, что этот объект Order не похож на объект Order, используемый в основном приложении TacoCloud; это немного проще:

package tacos.email;

import java.util.ArrayList;

import java.util.List;

import lombok.Data;

@Data

public class Order {

private final String email;

private List tacos = new ArrayList<>();

public void addTaco(Taco taco) {

this.tacos.add(taco);

}

}

Вместо того, чтобы содержать всю информацию о доставке и выставлении счетов клиентам, этот класс Order несет только электронную почту клиента, полученную из входящей электронной почты.

Парсинг электронных писем в тако-заказы является нетривиальной задачей. На самом деле, даже простенькая реализация включает в себя несколько десятков строк кода. И эти несколько десятков строк кода ничего не дадут для дальнейшего обсуждения Spring Integration и того, как реализовать трансформер. Поэтому, чтобы сэкономить место, я опускаю детали метода processPayload().

Последнее, что делает EmailToOrderTransformer, это возвращает MessageBuilder с полезной нагрузкой, содержащей объект Order. Сообщение, сгенерированное MessageBuilder, отправляется последнему компоненту в потоке интеграции: обработчику сообщений, который отправляет заказ в Taco Cloud API. OrderSubmitMessageHandler, показанный в следующем листинге, реализует GenericHandler в Spring Integration для обработки сообщений с полезной нагрузкой Order.

Листинг 9.7. Отправка заказов в Taco Cloud API через обработчик сообщений

package tacos.email;

import java.util.Map;

import org.springframework.integration.handler.GenericHandler;

import org.springframework.stereotype.Component;

import org.springframework.web.client.RestTemplate;

@Component

public class OrderSubmitMessageHandler implements GenericHandler {

private RestTemplate rest;

private ApiProperties apiProps;

public OrderSubmitMessageHandler(

ApiProperties apiProps, RestTemplate rest) {

this.apiProps = apiProps;

this.rest = rest;

}

@Override

public Object handle(Order order, Map headers) {

rest.postForObject(apiProps.getUrl(), order, String.class);

return null;

}

}

Чтобы удовлетворить требования интерфейса GenericHandler, OrderSubmitMessageHandler переопределяет метод handle(). Этот метод получает входящий объект Order и использует внедренный RestTemplate для отправки Order посредством POST запроса на URL-адрес, полученный из объекте ApiProperties. Наконец, метод handle() возвращает null, чтобы указать, что этот обработчик отмечает конец потока.

ApiProperties используется, чтобы избежать жесткого кодирования URL-адреса при вызове postForObject(). Это файл свойств конфигурации, который выглядит следующим образом:

@Data

@ConfigurationProperties(prefix="tacocloud.api")

@Componentpublic class ApiProperties {

private String url;

}

А в application.yml URL-адрес для Taco Cloud API может быть настроен следующим образом:

tacocloud:

api:

url: http://api.tacocloud.com

Чтобы сделать RestTemplate доступным в проекте, чтобы его можно было внедрить в OrderSubmitMessageHandler, необходимо добавить веб-стартер Spring Boot в сборку проекта:

org.springframework.boot

spring-boot-starter-web

В то время как это делает RestTemplate доступным в classpath, оно также запускает автоконфигурирование для Spring MVC. В качестве автономного потока Spring Integration приложению не требуется Spring MVC или даже встроенный Tomcat, который обеспечивает автоконфигурация. Поэтому вам следует отключить автоконфигурирование Spring MVC с помощью следующей записи в application.yml:

spring:

main:

web-application-type: none

Свойство spring.main.web-application-type может быть установлено в servlet, reactive или none. Когда Spring MVC находится в classpath, автоконфигурация устанавливает его значение в servlet. Но здесь вы переопределяете его на none, чтобы Spring MVC и Tomcat не были автоматически сконфигурированы. (Мы поговорим подробнее о том, что означает, что приложение является reactive веб-приложением в главе 11.)

ИТОГ:

-Spring Integration позволяет определять потоки, через которые данные могут обрабатываться при их поступлении в приложение или на выходе из него.

-Интеграционные потоки могут быть определены в XML, Java или с использованием краткого стиля конфигурации Java DSL.

-Шлюзы сообщений и адаптеры каналов действуют как точки входа и выхода интеграционного потока.

-Сообщения могут быть преобразованы, разделены, агрегированы, направлены и обработаны активаторами службы в ходе потока.

-Каналы сообщений соединяют компоненты потока интеграции.

Spring in Action Covers Spring 5.0 перевод на русский. Глава 10

РАЗДЕЛ 3

Реактивный Spring

В разделе 3, мы рассмотрим новую замечательную поддержку реактивного программирования в Spring. В главе 10 обсуждаются основы реактивного программирования с Project Reactor, библиотекой реактивного программирования, которая лежит в основе реактивных функций Spring 5. Затем мы рассмотрим некоторые из наиболее полезных реактивных операций Reactor. В главе 11 мы вернемся к разработке REST API, представив Spring WebFlux, новую веб-инфраструктуру, которая позаимствовала многое из Spring MVC и предлагает новую реактивную модель для веб-разработки. В главе 12 завершается третья часть, в которой описывается постоянное сохранение реактивных данных с помощью Spring Data для чтения и записи данных в базы данных Cassandra и Mongo.

10. Знакомство с Reactor

В этой главе рассматривается

-Понимание реактивного программирования

-Project Reacto

-Реактивная работа с данными

Вы когда-нибудь имели подписку на газету или журнал? Интернет определенно уменьшил число подписчиков традиционных публикаций, но было время, когда подписка на газеты была одним из лучших способов быть в курсе событий дня. Вы можете рассчитывать на свежую доставку информации о текущих событиях каждое утро, чтобы читать во время завтрака или по дороге на работу.

Теперь предположим, что если после оплаты вашей подписки прошло несколько дней, и издания не были доставлены. Проходит еще несколько дней, и вы звоните в отдел продаж, чтобы узнать, почему вы еще не получили ежедневную газету. Представьте свой сюрприз, если они объяснят: «Вы заплатили за целый год газет. Год еще не закончен. Вы обязательно получите их все, как только будет готово полное годовое издание газет».

К счастью, это совсем не то, как работают подписки. Газеты имеют определенную периодичность. Они доставляются как можно быстрее после публикации, чтобы их можно было прочитать, пока их содержание еще свежо. Более того, когда вы читаете последний выпуск, газетные репортеры пишут новые истории для будущих изданий, и прессы запускаются для выпуска следующего выпуска - все параллельно.

Поскольку мы разрабатываем код приложения, мы можем написать два стиля кода: императивный и реактивный:

Императивный код очень похож на эту абсурдную гипотетическую подписку на газету. Это последовательный набор задач, каждая из которых выполняется по одной, каждая после предыдущей. Данные обрабатываются в большом количестве и не могут быть переданы следующей задаче, пока предыдущая задача не завершила свою работу с массивом данных.

Реактивный код очень похож на настоящую подписку на газету. Набор задач определен для обработки данных, но эти задачи могут выполняться параллельно. Каждая задача может обрабатывать подмножества данных, передавая их следующей задаче в очереди, пока она продолжает работать с другим подмножеством данных.

В этой главе мы временно отойдем от приложения Taco Cloud, чтобы изучить Project Reactor. Reactor - это библиотека для реактивного программирования, которая является частью семейства проектов Spring. И поскольку он служит основой поддержки реактивного программирования в Spring 5, важно, чтобы вы поняли Reactor, прежде чем мы рассмотрим создание реактивных контроллеров и репозиториев с помощью Spring. Прежде чем мы начнем работать с Reactor, давайте быстро рассмотрим основы реактивного программирования.

10.1 Понимание реактивного программирования

Реактивное программирование - это парадигма, альтернативная императивному программированию. Эта альтернатива существует, потому что реактивное программирование устраняет ограничение в императивном программировании. Понимая эти ограничения, вы можете лучше понять преимущества реактивной модели.

Обратите внимание, что реактивное программирование - это не серебряная пуля. Ни в коем случае не следует выводить из этой главы или любого другого обсуждения реактивного программирования, что императивное программирование-это зло, а реактивное программирование-ваш спаситель. Как и все, что вы изучаете как разработчик, реактивное программирование идеально подходит в некоторых случаях использования, а в других плохо подходит. Рекомендуется унция прагматизма.

Если вы похожи на меня и многих разработчиков, вы режете свои программные зубы императивным программированием. Есть хороший шанс, что большинство (или весь) код, который вы пишете сегодня, по-прежнему является императивным по своей природе. Императивное программирование достаточно интуитивно, что молодые студенты с легкостью изучают его в своих школьных программах STEM, и оно достаточно мощное, чтобы составлять основную часть кода, который используется во многих крупнейших предприятия.

Идея проста: вы пишете код в виде списка инструкций, которым нужно следовать, по одному в том порядке, в котором они встречаются. Задача выполнена, и программа ожидает ее завершения, прежде чем перейти к следующей задаче. На каждом этапе обработки данные, которые должны быть обработаны, должны быть полностью доступны, чтобы их можно было обрабатывать целиком.

Во время выполнения задачи, особенно если это задача I/O, такая как запись данных в базу данных или выборка данных с удаленного сервера, поток, вызвавший эту задачу, блокируется и не может ничего сделать, пока задача не завершится. Проще говоря, заблокированные потоки расточительны.

Большинство языков программирования, включая Java, поддерживают параллельное программирование. Довольно просто запустить другой поток в Java и отправить его на выполнение некоторой работы, в то время как вызывающий поток продолжает что-то еще. Но хотя создавать потоки легко, эти потоки, скорее всего, сами блокируются. Управление параллелизмом в нескольких потоках является сложной задачей. Больше потоков означает больше сложности.

Напротив, реактивное программирование носит функциональный и декларативный характер. Вместо описания набора шагов, которые должны выполняться последовательно, реактивное программирование включает описание конвейера или потока, через который проходят данные. Вместо того чтобы требовать, чтобы данные были доступны для обработки в целом, реактивный поток обрабатывает данные по мере их поступления. Фактически, поступающие данные могут быть бесконечными (например, постоянный поток данных о температуре в реальном времени).

Чтобы применить аналогию с реальным миром, рассмотрим императивное программирование как водяной шар и реактивное программирование как садовый шланг. Оба являются подходящими способами удивить и замочить ничего не подозревающего друга в жаркий летний день. Но они отличаются по стилю исполнения:

-Воздушный шар воды несет свою полезную нагрузку всю сразу, замачивая свою намеченную цель в момент удара. Однако водяной шар имеет конечную емкость, и если вы хотите замочить больше людей (или одного и того же человека в большей степени), ваш единственный выбор-увеличить количество водных шаров.

-Садовый шланг несет свою полезную нагрузку в виде потока воды, который течет от крана к соплу. Емкость садового шланга может быть конечной в любой момент времени, но она неограничена в течение водного боя. Пока вода поступает в шланг из крана, она будет продолжать течь через шланг и распыляться из сопла. Тот же садовый шланг легко масштабируется, чтобы замочить столько друзей, сколько вы хотите.

Нет ничего изначально плохого в водяных шарах (или императивном программировании), но человек, держащий садовый шланг (или применяющий реактивное программирование), имеет преимущество в отношении масштабируемости и производительности.

10.1.1 Определение реактивных потоков

Reactive Streams - инициатива, начатая в конце 2013 года инженерами из Netflix, Lightbend и Pivotal (компания, стоящая за Spring). Reactive Streams стремится обеспечить стандарт для асинхронной обработки потока с неблокирующим обратным давлением (backpressure).

Мы уже затронули асинхронную черту реактивного программирования; это то, что позволяет нам выполнять задачи параллельно для достижения большей масштабируемости. Противодавление - это средство, с помощью которого потребители данных могут избежать перегруженности слишком быстрым источником данных, устанавливая ограничения на то, сколько они готовы обработать.

Java Streams vs. Reactive Streams

Существует много общего между Java Streams и Reactive Streams. Для начала, они оба имеют слово потоки в своих именах. Они также предоставляют функциональный API для работы с данными. На самом деле, как вы увидите позже, когда мы посмотрим на Reactor, они даже выполняют одни и те же операции.

Однако Java Streams обычно являются синхронными и работают с конечным набором данных. По сути, они являются средством перебора коллекции с помощью функций.

Reactive Streams поддерживают асинхронную обработку наборов данных любого размера, включая бесконечные наборы данных. Они обрабатывают данные в режиме реального времени, когда они становятся доступными, с противодавлением (backpressure), чтобы не перегружать своих потребителей.

Спецификация реактивных потоков может быть описана четырьмя определениями интерфейсами: Publisher, Subscriber, Subscription, и Processor. Publisher создает данные, которые он отправляет Subscriber на Subscription. Интерфейс Publisher объявляет единый метод subscribe() с помощью которого Subscriber может подписаться на Publisher:

public interface Publisher {

void subscribe(Subscriber subscriber);

}

После того как Subscriber подписался, он может получать события от Publisher. Эти события отправляются через методы интерфейса Subscriber:

public interface Subscriber {

void onSubscribe(Subscription sub);

void onNext(T item);

void onError(Throwable ex);

void onComplete();

}

Первое событие, которое получит Subscriber, - это вызов функции onSubscribe(). Когда Publisher вызывает функцию onSubscribe(), он передает Subscription объект Subscriber. Именно через Subscription Subscriber может управлять своей подпиской:

public interface Subscription {

void request(long n);

void cancel();

}

Subscriber может вызвать функцию request(), чтобы запросить отправку данных, или функцию cancel(), чтобы указать, что он больше не заинтересован в получении данных и отменяет подписку. При вызове функции request() Subscriber передает long значение, чтобы указать, сколько элементов данных он готов принять. Именно здесь возникает обратное давление (backpressure), препятствующее Publisher отправлять больше данных, чем может обработать Subscriber. После того, как Publisher отправил столько элементов, сколько было запрошено, Subscriber может снова вызвать функцию request(), чтобы запросить больше.

После того, как Subscriber запросил данные, данные начинают поступать через поток. Для каждого элемента, опубликованного Publisher, будет вызван метод onNext() для доставки данных Subscriber. Если есть какие-либо ошибки, вызывается onError(). Если у Publisher нет больше данных для отправки и он не будет генерировать больше данных, он вызовет onComplete(), чтобы сообщить подписчику, что он завершил процесс.

Что касается интерфейса Processor, это комбинация Subscriber и Publisher, как показано здесь:

public interface Processor extends Subscriber, Publisher {}

Как Subscriber, Processor будет получать данные и обрабатывать их каким-либо образом. Затем он будет “переоденется” и выступит в качестве Publisher, чтобы публиковать результаты для своих Subscribers.

Как вы можете видеть, спецификация Reactive Streams довольно проста. Довольно легко понять, как можно построить конвейер обработки данных, который начинается с Publisher, прогоняет данные через ноль или более Processors, а затем передает конечные результаты в Subscriber.

Однако интерфейсы Reactive Streams не могут использоваться для создания такого потока функциональным способом. Project Reactor - это реализация спецификации Reactive Streams, которая предоставляет функциональный API для создания Reactive Streams. Как вы увидите в следующих главах, Reactor является основой модели реактивного программирования в Spring 5. В оставшейся части этой главы мы собираемся исследовать (и, осмелюсь сказать, очень весело провести время) Project Reactor.

10.2 Начало работы с Reactor

Реактивное программирование требует от нас думать совсем иначе, чем императивное программирование. Вместо того, чтобы описывать набор шагов, которые необходимо предпринять, реактивное программирование означает построение конвейера, по которому будут проходить данные. Когда данные проходят через конвейер, они могут быть изменены или использованы каким-либо образом.

Например, предположим, что вы хотите взять имя человека, изменить все его буквы на заглавные, использовать его для создания приветственного сообщения, а затем, наконец, напечатать его. В модели императивного программирования код будет выглядеть примерно так:

String name = "Craig";

String capitalName = name.toUpperCase();

String greeting = "Hello, " + capitalName + "!";

System.out.println(greeting);

В императивной модели каждая строка кода выполняет шаг, один за другим, и определенно в одном и том же потоке. Каждый шаг блокирует выполнение потока до следующего шага до его завершения.

Функциональный, реактивный код может достичь того же этого же:

Mono.just("Craig")

.map(n -> n.toUpperCase())

.map(cn -> "Hello, " + cn + "!")

.subscribe(System.out::println);

Не волнуйтесь про, возможное, недопонимание этого примера; мы скоро поговорим об операциях just(), map() и subscribe(). На данный момент важно понимать, что хотя реактивный пример все еще следует пошаговой модели, на самом деле это конвейер, через который проходят данные. На каждом этапе конвейера данные каким-то образом изменяются, но нельзя точно сказать, какой поток выполняет какие операции. Все операции могут выполняться в одном потоке ... или не в одном.

Mono в этом примере является одним из двух основных типов Reactor. Второй это - Flux. Оба являются реализациями Reactive Streams Publisher. Поток представляет собой конвейер из нуля, одного или многих (потенциально бесконечных) элементов данных. Mono - это специализированный реактивный тип, оптимизированный для случаев, когда известно, что в наборе данных содержится не более одного элемента данных.

Reactor vs. RxJava (ReactiveX)

Если вы уже знакомы с RxJava или ReactiveX, возможно, вы думаете, что Mono и Flux звучат во многом как Observable и Single. На самом деле, они примерно эквивалентны семантически. Они даже предлагают много одинаковых операций.

Хотя в этой книге мы сосредоточимся на Reactor, вы, возможно, будете рады узнать, что можно скрывать типы Reactor и RxJava. Более того, как вы увидите в следующих главах, Spring также может работать с типами RxJava.

На самом деле в предыдущем примере три Mono. Операция just() создает первую. Когда Mono выдает значение, это значение присваивается операции map(), которая описывает что слово должно быть написано заглавными буквами и использовано для создания другого Mono. Когда второй Mono публикует свои данные, он передается второй операции map() для выполнения конкатенации строк, результаты которой используются для создания третьего Mono. Наконец, вызов метода subscribe() подписывается на Mono, получает данные и печатает их.

10.2.1 Схема реактивных потоков

Реактивные потоки часто иллюстрируются мраморными (marble) диаграммами. Мраморные (marble) диаграммы в своей простейшей форме изображают временную шкалу данных, потоков проходящих через Flux или Mono вверху, операцию в середине и временную шкалу результирующего Flux или Mono внизу. На рис. 10.1 показан шаблон диаграммы состояния потока. Как вы можете видеть, когда данные проходят через исходный Flux, он обрабатывается через некоторую операцию, в результате чего возникает новый Flux, через который проходят обработанные данные.

На рисунке 10.2 показана аналогичная мраморная (marble) диаграмма, но для Mono. Как видите, ключевое отличие состоит в том, что у Mono будет либо ноль, либо один элемент данных, либо ошибка.

Загрузка...