В разделе 10.3 мы рассмотрим многие операции, поддерживаемые Flux и Mono, и будем использовать мраморные (marble) диаграммы для визуализации их работы.

Рисунок 10.1 Мраморная (marble) диаграмма, иллюстрирующая основной поток Flux

Рисунок 10.2 Мраморная (marble) диаграмма, иллюстрирующая основной поток Mono

10.2.2 Добавление Reactor зависимостей

Чтобы начать работу с Reactor, добавьте следующую зависимость в сборку проекта:

io.projectreactor

reactor-core

Reactor также предоставляет отличную поддержку тестирования. Мы собираемся написать множество тестов для своего кода Reactor, поэтому вам обязательно нужно добавить эту зависимость в вашу сборку:

io.projectreactor

reactor-test

test

Я предполагаю, что вы добавляете эти зависимости в проект Spring Boot, который обрабатывает управление зависимостями (dependency management) для вас, поэтому нет необходимости указывать элемент для зависимостей. Но если вы хотите использовать Reactor в проекте, отличном от Spring Boot, вам потребуется настроить спецификацию Reactor BOM (bill of materials) в сборке. Следующая запись управления зависимостями добавляет Reactor Bismuth в сборку:

io.projectreactor

reactor-bom

Bismuth-RELEASE

pom

import

Теперь, когда Reactor находится в разработке вашего проекта, вы можете начать создавать реактивные конвейеры с Mono и Flux. В оставшейся части этой главы мы рассмотрим несколько операций, предлагаемых Mono и Flux.

10.3 Применение общих реактивных операций

Flux и Mono являются наиболее важными строительными блоками, предоставляемыми Reactor, и операции, предлагаемые этими двумя реактивными типами, представляют собой раствор, который связывает их вместе для создания конвейеров, по которым могут передаваться данные. Между Flux и Mono существует более 500 операций, каждую из которых можно условно классифицировать как

-Операции создания

-Комбинированные операции

-Операции трансформации

-Логические операции

Как бы ни было интересно смотреть на каждую из 500 операций, чтобы увидеть, как они работают, в этой главе просто не хватит места. В этом разделе я выбрал несколько наиболее полезных операций для экспериментов. Начнем с операций создания.

ПРИМЕЧАНИЕ

Где примеры Mono? Mono и Flux используют много одинаковых операций, поэтому в большинстве случаев нет необходимости показывать одну и ту же операцию дважды, один раз для Mono и еще раз для Flux. Более того, хотя операции Mono полезны, на них немного менее интересно смотреть, чем на те же самые операции, когда предоставляется Flux. Большинство примеров, с которыми мы будем работать, будут связаны с Flux. Просто знайте, что Mono обычно имеет эквивалентные операции.

10.3.1 Реактивные типы создания

Часто при работе с реактивными типами в Spring вы получаете Flux или Mono из репозитория или службы, поэтому вам не нужно создавать их самостоятельно. Но иногда вам нужно будет создать нового реактивного издателя.

Reactor предоставляет несколько операций для создания Flux и Mono. В этом разделе мы рассмотрим некоторые из наиболее полезных операций создания.

СОЗДАНИЕ ИЗ ОБЪЕКТОВ

Если у вас есть один или несколько объектов, из которых вы хотите создать Flux или Mono, вы можете использовать статический метод just() в Flux или Mono для создания реактивного типа, данные которого управляются этими объектами. Например, следующий метод тестирования создает Flux из пяти объектов String:

@Test

public void createAFlux_just() {

Flux fruitFlux = Flux

.just("Apple", "Orange", "Grape", "Banana", "Strawberry");

}

На данный момент Flux создан, но у него нет подписчиков. Без подписчиков данные не будут передаваться. Думая об аналогии с садовым шлангом, вы прикрепили садовый шланг к патрубку, и с другой стороны есть вода из коммунальной компании - но пока вы не включите патрубок, вода не будет течь. Подписка на реактивный тип - это то, как вы включаете поток данных.

Чтобы добавить подписчика, вы можете вызвать метод subscribe() в Flux:

fruitFlux.subscribe(

f -> System.out.println("Here's some fruit: " + f)

);

Здесь лямбда, заданная для subscribe(), на самом деле является java.util.Consumer, который используется для создания Reactive Streams Subscriber. При вызове subscribe() данные начинают передаваться. В этом примере промежуточных операций нет, поэтому данные передаются напрямую от Flux к Subscriber.

Печать записей из Flux или Mono на консоль - это хороший способ увидеть реактивный тип в действии. Но лучший способ на самом деле протестировать Flux или Mono - использовать StepVerifier от Reactor. С учетом Flux или Mono, StepVerifier подписывается на реактивный тип и затем применяет утверждения к данным, когда они проходят через поток, в конце концов проверяя, что поток завершается должным образом.

Например, чтобы проверить, что предписанные данные проходят через fruitFlux, вы можете написать тест, который выглядит следующим образом:

StepVerifier.create(fruitFlux)

.expectNext("Apple")

.expectNext("Orange")

.expectNext("Grape")

.expectNext("Banana")

.expectNext("Strawberry")

.verifyComplete();

В этом случае StepVerifier подписывается на Flux и затем утверждает, что каждый элемент соответствует ожидаемому названию плода. Наконец, он проверяет, что после того, как клубника произведена Flux, Flux завершается.

В оставшейся части примеров в этой главе вы будете использовать StepVerifier для написания обучающих тестов - тестов, которые проверяют поведение и помогают понять, как что-то работает, - чтобы узнать некоторые из наиболее полезных операций Reactor.

СОЗДАНИЕ ИЗ КОЛЛЕКЦИЙ

Flux также может быть создан из массива, Iterable или Java Stream. Рисунок 10.3 иллюстрирует, как это работает с мраморной диаграммой.

Рисунок 10.3. Поток может быть создан из массива, Iterable или Stream.

Чтобы создать Flux из массива, вызовите статический метод fromArray(), передав в исходный массив:

@Test

public void createAFlux_fromArray() {

String[] fruits = new String[] {

"Apple", "Orange", "Grape", "Banana", "Strawberry" };

Flux fruitFlux = Flux.fromArray(fruits);

StepVerifier.create(fruitFlux)

.expectNext("Apple")

.expectNext("Orange")

.expectNext("Grape")

.expectNext("Banana")

.expectNext("Strawberry")

.verifyComplete();

}

Поскольку исходный массив содержит те же имена фруктов, которые вы использовали при создании Flux из списка объектов, данные, передаваемые Flux, будут иметь те же значения. Таким образом, вы можете использовать тот же StepVerifier, что и раньше, чтобы проверить этот Flux.

Если вам нужно создать Flux из java.util.List, java.util.Set или любой другой реализации java.lang.Iterable, вы можете передать его в статический метод fromIterable():

@Test

public void createAFlux_fromIterable() {

List fruitList = new ArrayList<>();

fruitList.add("Apple");

fruitList.add("Orange");

fruitList.add("Grape");

fruitList.add("Banana");

fruitList.add("Strawberry");

Flux fruitFlux = Flux.fromIterable(fruitList);

// ... проверить шаги

}

Или, если у вас есть Java Stream, который вы хотели бы использовать в качестве источника для Flux, fromStream() - это метод, который вы будете использовать:

@Test

public void createAFlux_fromStream() {

Stream fruitStream =

Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");

Flux fruitFlux = Flux.fromStream(fruitStream);

// ... проверить шаги

}

Опять же, тот же StepVerifier, что и раньше, можно использовать для проверки данных, опубликованных Flux.

ГЕНЕРИРОВАНИЕ FLUX ДАННЫХ

Иногда у вас нет данных для работы, и вам просто нужно, чтобы Flux действовал как счетчик, отдавая число, которое увеличивается с каждым новым значением. Для создания счетчика Flux вы можете использовать статический метод range(). Диаграмма на рисунке 10.4 иллюстрирует, как работает range().

Рисунок 10.4 Создание Flux из диапазона приводит к публикации сообщений в противоположном стиле.

Следующий метод тестирования демонстрирует, как создать Flux диапазон:

@Test

public void createAFlux_range() {

Flux intervalFlux =

Flux.range(1, 5);

StepVerifier.create(intervalFlux)

.expectNext(1)

.expectNext(2)

.expectNext(3)

.expectNext(4)

.expectNext(5)

.verifyComplete();

}

В этом примере диапазон Flux создается с начальным значением 1 и конечным значением 5. StepVerifier доказывает, что он опубликует пять элементов, которые являются целыми числами от 1 до 5.

Еще один метод создания Flux, похожий на range(), представляет собой interval(). Как и метод range(), interval() создает поток, который возвращает увеличивающееся значение. Но то, что делает interval() особенным, заключается в том, что вместо того, чтобы указывать начальное и конечное значение, вы указываете длительность или как часто значение должно возвращаться. На рисунке 10.5 показана мраморная диаграмма для метода создания interval().

Рисунок 10.5 Flux, созданный из интервала, имеет периодическую запись, опубликованную в нем. (A Flux created from an interval has a periodic entry published to it.)

Например, чтобы создать поток интервалов, который выдает значение каждую секунду, можно использовать статический метод interval() следующим образом:

@Test

public void createAFlux_interval() {

Flux intervalFlux =

Flux.interval(Duration.ofSeconds(1))

.take(5);

StepVerifier.create(intervalFlux)

.expectNext(0L)

.expectNext(1L)

.expectNext(2L)

.expectNext(3L)

.expectNext(4L)

.verifyComplete();

}

Обратите внимание,что значение, возвращаемое потоком интервалов, начинается с 0 и увеличивается на каждом последующем элементе. Кроме того, поскольку interval() не имеет максимального значения, он потенциально будет работать вечно. Поэтому вы также используете операцию take(), чтобы ограничить результаты первыми пятью записями. Подробнее об операции take() мы поговорим в следующем разделе.

10.3.2 Комбинирование реактивных типов

Однажды может возникнуть задача когда Вам придется работать с двумя реактивными типами, которые вам нужно как-то объединить. Или, в других случаях, вам может потребоваться разделить Flux на несколько реактивных типов. В этом разделе мы рассмотрим операции, которые объединяют и разделяют Flux и Mono в Reactor.

СЛИЯНИЕ РЕАКТИВНЫХ ТИПОВ

Предположим, у вас есть два потока потока и нужно создать один результирующий поток, который будет производить данные, как только он станет доступен из любого из вышерасположенных Flux streams. Чтобы объединить один поток с другим, можно использовать операцию mergeWith(), как показано на marble диаграмме на рис.10.6.

Рисунок 10.6 слияние двух Flux потоков чередя их сообщения в новом Flux.

Например, предположим, что у вас есть Flux, значения которого являются именами телевизионных и киношных персонажей, и у вас есть второй Flux, значениями которого являются названия продуктов, которые эти персонажи любят есть. Следующий тестовый метод показывает, как можно объединить два объекта Flux с помощью метода mergeWith():

@Test

public void mergeFluxes() {

Flux characterFlux = Flux

.just("Garfield", "Kojak", "Barbossa")

.delayElements(Duration.ofMillis(500));

Flux foodFlux = Flux

.just("Lasagna", "Lollipops", "Apples")

.delaySubscription(Duration.ofMillis(250))

.delayElements(Duration.ofMillis(500));

Flux mergedFlux = characterFlux.mergeWith(foodFlux);

StepVerifier.create(mergedFlux)

.expectNext("Garfield")

.expectNext("Lasagna")

.expectNext("Kojak")

.expectNext("Lollipops")

.expectNext("Barbossa")

.expectNext("Apples")

.verifyComplete();

}

Обычно Flux публикует данные настолько быстро, насколько это возможно. Таким образом, вы используете операцию delayElements() в обоих созданных потоках Flux, чтобы немного их замедлить - отправляя запись каждые 500 мс. Кроме того, чтобы поток продуктов начинал передаваться, после Flux имен, вы применяете операцию delaySubscription() к потоку продуктов, чтобы он не отправлял никаких данных, пока не пройдет 250 мс после подписки.

После объединения двух объектов Flux создается новый объединенный Flux. Когда StepVerifier подписывается на объединенный поток, он, в свою очередь, подписывается на два исходных Flux потока, начиная поток данных.

Порядок предметов, отдаваемый из объединенного потока, совпадает со временем их передачи из источников. Поскольку оба объекта Flux настроены на отдачу с регулярной скоростью, значения будут чередоваться через объединенный поток, в результате чего будет получено имя, затем пища, затем имя и т. Д. Если время либо Flux должно быть изменены, возможно, вы увидите два персонажа или два продукта, опубликованные один за другим.

Поскольку mergeWith() не может гарантировать идеальное взаимодействие между его источниками, вы можете рассмотреть операцию zip() вместо этого. Когда два объекта Flux сжимаются вместе, это приводит к новому Flux, который создает кортеж элементов, где кортеж содержит один элемент из каждого исходного потока. На рис. 10.7 показано, как два объекта Flux можно сжать вместе.

Рис. 10.7. Сжатие двух потоков Flux приводит к созданию Flux, содержащего наборы по одному элементу от каждого потока.

Чтобы увидеть действие zip() в действии, рассмотрите следующий метод тестирования, который объединяет Flux персонажей и Flux продукты вместе:

@Test

public void zipFluxes() {

Flux characterFlux = Flux

.just("Garfield", "Kojak", "Barbossa");

Flux foodFlux = Flux

.just("Lasagna", "Lollipops", "Apples");

Flux> zippedFlux =

Flux.zip(characterFlux, foodFlux);

StepVerifier.create(zippedFlux)

.expectNextMatches(p ->

p.getT1().equals("Garfield") &&

p.getT2().equals("Lasagna"))

.expectNextMatches(p ->

p.getT1().equals("Kojak") &&

p.getT2().equals("Lollipops"))

.expectNextMatches(p ->

p.getT1().equals("Barbossa") &&

p.getT2().equals("Apples"))

.verifyComplete();

}

Обратите внимание, что в отличие от mergeWith(), операция zip() является статической операцией создания. Созданный Flux имеет идеальное выравнивание между персонажами и их любимыми блюдами. Каждый элемент, испускаемый из сжатого потока, представляет собой Tuple2 (контейнерный объект, который содержит два других объекта), содержащий элементы из каждого исходного потока в порядке их публикации.

Если вы предпочитаете не работать с Tuple2, а работать с каким-то другим типом, вы можете предоставить функцию zip(), которая создает любой объект, который вы хотите, учитывая два элемента (как показано на диаграмме marble на рисунке 10.8).

Рисунок 10.8 альтернативная форма операции zip приводит к Flux сообщений, созданных из одного элемента каждого входящего Flux.

Например, в следующем методе тестирования показано, как связать Flux имен с Flux продуктов питания, чтобы получить в результате Flux из String объектов:

@Test

public void zipFluxesToObject() {

Flux characterFlux = Flux

.just("Garfield", "Kojak", "Barbossa");

Flux foodFlux = Flux

.just("Lasagna", "Lollipops", "Apples");

Flux zippedFlux =

Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);

StepVerifier.create(zippedFlux)

.expectNext("Garfield eats Lasagna")

.expectNext("Kojak eats Lollipops")

.expectNext("Barbossa eats Apples")

.verifyComplete();

}

Функция, заданная для zip() (заданная здесь как лямбда), просто объединяет два элемента в предложение, которое отдается зипованным Flux.

ВЫБОР ПЕРВОГО РЕАКТИВНОГО ТИПА ДЛЯ ПУБЛИКАЦИИ

Предположим, у вас есть два Flux объекта, и вместо того, чтобы объединить их, вы просто хотите создать новый Flux, который будет генерировать значения из первого Flux, который создает значение. Как показано на рисунке 10.9, операция first() выбирает первый из двух объектов Flux и отображает значения, которые она публикует.

Рис. 10.9. first операция выбирает первый Flux, который отправляет сообщение, и после этого создает только сообщения из этого потока.

Следующий метод тестирования создает быстрый Flux и медленный Flux (где “медленный " означает, что он не будет публиковать элемент до 100 мс после подписки). Используя first(), он создает новый Flux, который будет публиковать значения только из первого исходного Flux для публикации значения:

@Test

public void firstFlux() {

Flux slowFlux = Flux.just("tortoise", "snail", "sloth")

.delaySubscription(Duration.ofMillis(100));

Flux fastFlux = Flux.just("hare", "cheetah", "squirrel");

Flux firstFlux = Flux.first(slowFlux, fastFlux);

StepVerifier.create(firstFlux)

.expectNext("hare")

.expectNext("cheetah")

.expectNext("squirrel")

.verifyComplete();

}

В этом случае, поскольку медленный Flux не будет публиковать никаких значений до 100 мс после начала публикации быстрого Flux, вновь созданный Flux будет просто игнорировать медленный Flux и публиковать только значения из быстрого Flux.

10.3.3 Преобразование и фильтрация реактивных потоков

Когда данные проходят через поток, вам, вероятно, потребуется отфильтровать некоторые значения и изменить другие значения. В этом разделе мы рассмотрим операции, которые преобразуют и фильтруют данные, проходящие через реактивный поток.

ФИЛЬТРАЦИЯ ДАННЫХ ИЗ РЕАКТИВНЫХ ТИПОВ

Один из самых основных способов фильтрации данных при их поступлении из Flux - просто игнорировать первые записи. Операция skip(), показанная на рисунке 10.10, делает именно это.

Рисунок 10.10. Операция skip пропускает указанное количество сообщений перед передачей оставшихся сообщений в результирующий Flux.

Учитывая Flux с несколькими записями, операция skip() создаст новый Flux, который пропускает заданное количество элементов, прежде чем отдавать остальные элементы из исходного Flux. Следующий метод тестирования показывает, как использовать skip():

@Test

public void skipAFew() {

Flux skipFlux = Flux.just(

"one", "two", "skip a few", "ninety nine", "one hundred")

.skip(3);

StepVerifier.create(skipFlux)

.expectNext("ninety nine", "one hundred")

.verifyComplete();

}

В этом случае у вас есть Flux из пяти String элементов. Вызов метода skip(3) для этого потока создает новый Flux, который пропускает первые три элемента и публикует только последние два элемента.

Но, возможно, вы не хотите пропускать определенное количество элементов, а вместо этого нужно пропустить некоторое количество элементов, определяемое не количеством, а временем. Альтернативная форма операции skip(), показанная на рисунке 10.11, создает Flux, который ожидает, пока не пройдет некоторое заданное время, прежде чем отдавать элементы из исходного Flux.

Рисунок 10.11. Альтернативная форма операции skip ждет, пока не пройдет некоторое время, прежде чем передавать сообщения в результирующий поток.

Следующий метод тестирования использует функцию skip() для создания Flux, который ожидает четыре секунды, прежде чем начинает отдавать какие-либо значения. Поскольку этот Flux был создан из Flux, который имеет односекундную задержку между элементами (используя delayElements()), будут переданы только последние два элемента:

@Test

public void skipAFewSeconds() {

Flux skipFlux = Flux.just(

"one", "two", "skip a few", "ninety nine", "one hundred")

.delayElements(Duration.ofSeconds(1))

.skip(Duration.ofSeconds(4));

StepVerifier.create(skipFlux)

.expectNext("ninety nine", "one hundred")

.verifyComplete();

}

Вы уже видели пример метода take(), но в познакомившись с методом skip(), take() можно рассматривать как противоположность skip (). В то время как функция skip() пропускает первые несколько элементов, функция take() выдает только первые несколько элементов (как показано на диаграмме marble на рис. 10.12):

@Test

public void take() {

Flux nationalParkFlux = Flux.just(

"Yellowstone", "Yosemite", "Grand Canyon",

"Zion", "Grand Teton")

.take(3);

StepVerifier.create(nationalParkFlux)

.expectNext("Yellowstone", "Yosemite", "Grand Canyon")

.verifyComplete();

}

Рис. 10.12 операция take передает только первые несколько сообщений из входящего Flux, а затем отменяет подписку.

Как и skip(), take() также имеет альтернативную форму, основанную на длительности, а не на количестве элементов. Он будет принимать и отдавать столько элементов, сколько проходит через исходный поток, пока не пройдет некоторый период времени, после чего поток завершится. Это показано на рисунке 10.13.

Рис. 10.13. Альтернативная форма операции take передает сообщения в результирующий поток до тех пор, пока не пройдет некоторое время.

В следующем методе тестирования используется альтернативная форма take() для отправки максимально возможного количества элементов в первые 3,5 секунды после подписки:

@Test

public void take() {

Flux nationalParkFlux = Flux.just(

"Yellowstone", "Yosemite", "Grand Canyon",

"Zion", "Grand Teton")

.delayElements(Duration.ofSeconds(1))

.take(Duration.ofMillis(3500));

StepVerifier.create(nationalParkFlux)

.expectNext("Yellowstone", "Yosemite", "Grand Canyon")

.verifyComplete();

}

Операции skip() и take() можно рассматривать как операции фильтрации, где критерии фильтра основаны на количестве или длительности. Для более общей фильтрации значений Flux вы найдете операцию filter() весьма полезной.

При наличии предиката, который решает, будет ли элемент проходить через поток или нет, операция filter() позволяет выборочно публиковать на основе любых критериев, которые вы хотите. Мраморная диаграмма на рисунке 10.14 показывает, как работает filter().

Рисунок 10.14. Входящий Flux может быть отфильтрован так, что полученный Flux получает только сообщения, которые соответствуют заданному предикату.

Чтобы увидеть filter() в действии, рассмотрите следующий метод тестирования:

@Test

public void filter() {

Flux nationalParkFlux = Flux.just(

"Yellowstone", "Yosemite", "Grand Canyon",

"Zion", "Grand Teton")

.filter(np -> !np.contains(" "));

StepVerifier.create(nationalParkFlux)

.expectNext("Yellowstone", "Yosemite", "Zion")

.verifyComplete();

}

Здесь filter() задается предикатом в виде лямбды, которая принимает только String значения без пробелов. Следовательно, «Grand Canyon» и «Grand Teton» отфильтровываются из итогового Flux.

Возможно, вам нужна фильтрация для всех предметов, которые вы уже получили. Операция Different(), как показано на рисунке 10.15, приводит к тому, что Flux публикует только элементы из исходного потока, которые еще не были опубликованы.

Рисунок 10.15. Операция distinct отфильтровывает любые повторяющиеся сообщения.

В следующем тесте только уникальные String значения будут излучаться из Flux:

@Test

public void distinct() {

Flux animalFlux = Flux.just(

"dog", "cat", "bird", "dog", "bird", "anteater")

.distinct();

StepVerifier.create(animalFlux)

.expectNext("dog", "cat", "bird", "anteater")

.verifyComplete();

}

Хотя «dog» и «bird» публикуются дважды из исходного потока, отдельный поток публикует их только один раз.

МАППИНГ РЕАКТИВНЫХ ДАННЫХ

Одной из наиболее распространенных операций, которые вы будете использовать в Flux или Моно, является преобразование опубликованных элементов в какую-либо другую форму или тип. Типы Reactor предлагают операции map() и flatMap() для этой цели.

Операция map() создает Flux, который просто выполняет преобразование, как предписано данной функцией, для каждого объекта, который он получает до его повторной публикации. На рисунке 10.16 показано, как работает операция map().

Рисунок 10.16. Операция map выполняет преобразование входящих сообщений в новые сообщения в результирующем потоке.

В следующем методе тестирования Flux String значения, представляющие баскетболистов, сопоставляются с новы Flux объектами Player:

@Test

public void map() {

Flux playerFlux = Flux

.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")

.map(n -> {

String[] split = n.split("\\s");

return new Player(split[0], split[1]);

});

StepVerifier.create(playerFlux)

.expectNext(new Player("Michael", "Jordan"))

.expectNext(new Player("Scottie", "Pippen"))

.expectNext(new Player("Steve", "Kerr"))

.verifyComplete();

}

Функция, заданная для map() (как лямбда), разбивает входящую String по пробелу и использует полученный массив String-ов для создания объекта Player. Хотя поток, созданный с помощью just(), переносил объекты String, Flux, полученный из map(), переносит объекты Player.

Что важно понимать в map(), так это то, что сопоставление выполняется синхронно, так как каждый элемент публикуется исходным Flux. Если вы хотите выполнить сопоставление асинхронно, вы должны рассмотреть операцию flatMap().

Операция flatMap() требует некоторой мысли и практики, чтобы овладеть всеми навыками. Как показано на рисунке 10.17, вместо простого сопоставления одного объекта другому, как в случае map(), flatMap() сопоставляет каждый объект новому Mono или Flux. Результаты Mono или Flux сведены в новый результирующий Flux. Когда используется вместе с subscribeOn(), flatMap() может раскрыть асинхронную мощь типов Reactor.

Рис. 10.17. Операция плоской карты (flat map) использует промежуточный Flux для выполнения преобразования, следовательно, допускает асинхронные преобразования.

Следующий метод тестирования демонстрирует использование flatMap() и subscribeOn():

@Test

public void flatMap() {

Flux playerFlux = Flux

.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")

.flatMap(n -> Mono.just(n)

.map(p -> {

String[] split = p.split("\\s");

return new Player(split[0], split[1]);

})

.subscribeOn(Schedulers.parallel())

);

List playerList = Arrays.asList(

new Player("Michael", "Jordan"),

new Player("Scottie", "Pippen"),

new Player("Steve", "Kerr"));

StepVerifier.create(playerFlux)

.expectNextMatches(p -> playerList.contains(p))

.expectNextMatches(p -> playerList.contains(p))

.expectNextMatches(p -> playerList.contains(p))

.verifyComplete();

}

Обратите внимание, что flatMap() получает лямбда-функцию, которая преобразует входящую String в Mono типа String. Затем к Mono применяется операция map() для преобразования String в Player.

Если вы остановитесь прямо здесь, результирующий поток будет передавать объекты Player, созданные синхронно в том же порядке, что и в примере с map(). Но операции с Mono завершаются вызовом subscribeOn(), чтобы указать, что каждая подписка должна проходить в параллельном потоке. Следовательно, операции сопоставления для нескольких входящих объектов String могут выполняться асинхронно и параллельно.

Хотя subscribeOn() называется очень похоже на subscribe(), но они совершенно разные. В то время как subscribe() - это глагол, подписывающийся на реактивный поток и эффективно запускающий его, subscribeOn() - является более описательной, определяя, как подписка должна обрабатываться параллельно. Reactor не навязывает какую-либо конкретную модель параллелизма; с помощью subscribeOn() вы можете указать модель параллелизма, используя один из статических методов из планировщиков, который вы хотите использовать. В этом примере вы использовали parallel(), которая использует рабочие потоки из фиксированного пула (размер которого соответствует числу ядер ЦП). Но планировщики поддерживают несколько моделей параллелизма, например, описанных в таблице 10.1.

Таблица 10.1 Модели параллелизма для планировщиков (Schedulers)

Метод планировщика: Описание

.immediate() - Выполняет подписку в текущем потоке.

.single() - Выполняет подписку в одном многоразовом потоке. Повторно использует один и тот же поток для всех абонентов.

.newSingle() - Выполняет подписку в выделенном потоке для каждого вызова.

.elastic() - Выполняет подписку в в рабочем пуле из неограниченного эластичного пула. Новые рабочие потоки создаются по мере необходимости, а простаивающие рабочие потоки удаляются (по умолчанию через 60 секунд).

.parallel() - Выполняет подписку в рабочем пуле из из пула фиксированного размера, размер которого соответствует числу ядер ЦП.

Преимуществом использования flatMap() и subscribeOn() является то, что вы можете увеличить пропускную способность потока, разделив работу между несколькими параллельными потоками. Но поскольку работа выполняется параллельно, без гарантий того, что будет постоянный порядок выполнения потоков, невозможно определить порядок элементов, передаваемых в результирующий поток. Таким образом, StepVerifier может только проверить, что каждый исходящий элемент существует в ожидаемом списке объектов Player и что до завершения потока будет три таких элемента.

БУФЕРИЗАЦИЯ ДАННЫХ В РЕАКТИВНОМ ПОТОКЕ

В процессе обработки данных, проходящих через Flux, может оказаться полезным разбить поток данных на небольшие части. Операция buffer(), показанная на рисунке 10.18, может помочь в этом.

Рисунок 10.18 Операция buffer приводит к листу Flux заданного максимального размера, которые формируется на основе входящего Flux.

Учитывая, что Flux у нас String значений, каждое из которых содержит имя фрукта, вы можете создать новый Flux коллекции List, в которой каждый List содержит не более указанного числа элементов:

@Test

public void buffer() {

Flux fruitFlux = Flux.just(

"apple", "orange", "banana", "kiwi", "strawberry");

Flux> bufferedFlux = fruitFlux.buffer(3);

StepVerifier

.create(bufferedFlux)

.expectNext(Arrays.asList("apple", "orange", "banana"))

.expectNext(Arrays.asList("kiwi", "strawberry"))

.verifyComplete();

}

В этом случае String элементы Flux помещаются в новые Flux коллекции List, содержащие не более трех элементов в каждой. Следовательно, исходный Flux, который передает пять значений String, будет преобразован в Flux, который передает две коллекции List, одна из которых содержит три фрукта, а другая - два фрукта.

И что? Буферизация значений из реактивного потока в нереактивные коллекции List представляется контрпродуктивной. Но когда вы комбинируете buffer() с flatMap(), это позволяет параллельно обрабатывать каждую из коллекций List:

Flux.just(

"apple", "orange", "banana", "kiwi", "strawberry")

.buffer(3)

.flatMap(x ->

Flux.fromIterable(x)

.map(y -> y.toUpperCase())

.subscribeOn(Schedulers.parallel())

.log()

).subscribe();

В этом новом примере вы по-прежнему буферизуете Flux из пяти String значений в Flux коллекций List. Но затем вы применяете flatMap() к этому Flux коллекций List. Это берет каждый List буфер и создает новый Flux из его элементов, а затем применяет к нему операцию map(). Следовательно, каждый буферный List дополнительно обрабатывается параллельно в отдельных потоках.

Чтобы доказать, что это работает, я также включил операцию log() для применения к каждому под-Flux-у. Операция log() просто регистрирует все события Reactive Streams, чтобы вы могли видеть, что на самом деле происходит. В результате в log записываются следующие записи (для краткости удален компонент времени):

[main] INFO reactor.Flux.SubscribeOn.1 -

onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)

[main] INFO reactor.Flux.SubscribeOn.1 - request(32)

[main] INFO reactor.Flux.SubscribeOn.2 -

onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)

[main] INFO reactor.Flux.SubscribeOn.2 - request(32)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)

[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)

[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()

[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

Как ясно видно из записей журнала, фрукты в первом буфере (apple, orange и banana) обрабатываются в потоке parallel-1. Между тем, фрукты во втором буфере (kiwi и strawberry) обрабатываются в parallel-2. Как видно из того факта, что записи журнала из каждого буфера сплетены вместе, два буфера обрабатываются параллельно.

Если по какой-то причине вам нужно собрать все, что Flux генерирует в List, вы можете вызвать buffer() без аргументов:

Flux> bufferedFlux = fruitFlux.buffer();

В результате создается новый Flux, который генерирует List, содержащий все элементы, опубликованные исходным Flux. Вы можете добиться того же самого с помощью операции collectList(), показанной на диаграмме на рисунке 10.19.

Рисунок 10.19 В результате операции сбора списка получается Mono, содержащий список всех сообщений, отправляемых входящим Flux.

Вместо того чтобы создавать Flux, который публикует List, collectList() создает Mono, который публикует List. Следующий метод тестирования показывает, как это можно использовать:

@Test

public void collectList() {

Flux fruitFlux = Flux.just(

"apple", "orange", "banana", "kiwi", "strawberry");

Mono> fruitListMono = fruitFlux.collectList();

StepVerifier

.create(fruitListMono)

.expectNext(Arrays.asList(

"apple", "orange", "banana", "kiwi", "strawberry"))

.verifyComplete();

}

Еще более интересный способ сбора элементов, возвращаемых Flux, - это собирать их в Map. Как показано на рисунке 10.20, операция collectMap() приводит к Mono, который публикует Map, заполненную записями, ключ которых рассчитывается данной функцией.

Рис. 10.20. Операция collectMap приводит к получению Mono, содержащему Map сообщений, передаваемым входящим Flux, где ключ выводится из некоторой характеристики входящих сообщений.

Чтобы увидеть collectMap() в действии, взгляните на следующий метод тестирования:

@Test

public void collectMap() {

Flux animalFlux = Flux.just(

"aardvark", "elephant", "koala", "eagle", "kangaroo");

Mono> animalMapMono =

animalFlux.collectMap(a -> a.charAt(0));

StepVerifier

.create(animalMapMono)

.expectNextMatches(map -> {

return

map.size() == 3 &&

map.get('a').equals("aardvark") &&

map.get('e').equals("eagle") &&

map.get('k').equals("kangaroo");

})

.verifyComplete();

}

Источник Flux испускает имена нескольких животных. К этому Flux вы применяете collectMap() для создания нового Mono, который создает Map, где значение ключа определяется первой буквой имени животного, а значение-само имя животного. В случае, если два названия животных начинаются с одной и той же буквы (например, elephant и eagle или koala и kangaroo), последняя запись, проходящая через поток, переопределяет все предыдущие записи.

10.3.4 Выполнение логических операций над реактивными типами

Иногда вам просто нужно знать, соответствуют ли записи, опубликованные Mono или Flux, некоторым критериям. Операции all() и any() выполняют такую логику. Рисунки 10.21 и 10.22 иллюстрируют, как работают all() и any().

Рисунок 10.21 Поток может быть проверен, чтобы убедиться, что все сообщения удовлетворяют некоторому условию в операции all.

Рисунок 10.22 поток может быть проверен, что по крайней мере одно сообщение удовлетворяет некоторому условию any операции.

Предположим, вы хотите знать, что каждая строка, публикуемая Flux, содержит букву a или букву k. Следующий тест показывает, как использовать all() для проверки этого условия:

@Test

public void all() {

Flux animalFlux = Flux.just(

"aardvark", "elephant", "koala", "eagle", "kangaroo");

Mono hasAMono = animalFlux.all(a -> a.contains("a"));

StepVerifier.create(hasAMono)

.expectNext(true)

.verifyComplete();

Mono hasKMono = animalFlux.all(a -> a.contains("k"));

StepVerifier.create(hasKMono)

.expectNext(false)

.verifyComplete();

}

В первом StepVerifier, проверяется наличие буквы a. Операция all применяется к исходному Flux, в результате чего получается Mono типа Boolean. В этом случае все названия животных содержат букву а, поэтому Mono будет содержать true. Но на втором этапе проверки результирующий Mono будет выдавать false, потому что не все имена животных содержат k.

Вместо того, чтобы выполнять проверку "все или ничего", Возможно, будет достаточно, если хотя бы одна запись соответствует условиям. В этом случае операция any() - это то, что вы хотите. Этот новый тестовый случай использует any() для проверки букв t и z:

@Test

public void any() {

Flux animalFlux = Flux.just(

"aardvark", "elephant", "koala", "eagle", "kangaroo");

Mono hasAMono = animalFlux.any(a -> a.contains("t"));

StepVerifier.create(hasAMono)

.expectNext(true)

.verifyComplete();

Mono hasZMono = animalFlux.any(a -> a.contains("z"));

StepVerifier.create(hasZMono)

.expectNext(false)

.verifyComplete();

}

В первом StepVerifier вы видите, что полученный Mono возвращает true, потому что по крайней мере одно имя животного имеет букву t (в частности, elephant). Во втором случае полученное Mono возвращает false, потому что ни одно из имен животных не содержит z.

ИТОГО:

-Реактивное программирование включает в себя создание конвейеров, по которым передаются данные.

-Спецификация Reactive Streams определяет четыре типа: «Издатель» (Publisher), «Подписчик» (Subscriber), «Подписка» (Subscription) и «Трансформер» (Transformer) (который является комбинацией Publisher и Subscriber).

-Проект Reactor реализует Reactive Streams и абстрагирует определения потоков в два основных типа, Flux и Mono, каждый из которых предлагает несколько сотен операций.

-Spring 5 использует Reactor для создания реактивных контроллеров, репозиториев, REST клиентов и другой поддержки реактивной платформы.

Spring in Action Covers Spring 5.0 перевод на русский. Глава 11

Глава 11. Разработка реактивных API

Эта глава охватывает:

-Использование Spring WebFlux

-Написание и тестирование реактивных контроллеров и клиентов

-Использование REST API

-Защита реактивных веб-приложений

Теперь, когда вы хорошо познакомились с реактивным программированием и Project Reactor, вы готовы начать применять эти методы в своих Spring приложениях. В этой главе мы собираемся вернуться к некоторым контроллерам, которые вы написали в главе 6, чтобы воспользоваться преимуществами модели реактивного программирования Spring 5.

Более конкретно, мы собираемся взглянуть на новую реактивную веб-инфраструктуру Spring 5 - Spring WebFlux. Как вы вскоре поймете, Spring WebFlux удивительно похож на Spring MVC, что делает его простым в применении, наряду с тем, что вы уже знаете о создании REST API в Spring.

11.1 Работа с Spring WebFlux

Типичные веб-фреймворки на основе сервлетов, такие как Spring MVC, являются блокирующими и многопоточными по своей природе, используя один поток на соединение. Когда запросы обрабатываются, рабочий поток извлекается из пула потоков для обработки запроса. Тем временем поток запросов блокируется, пока рабочий поток не уведомит его о завершении.

Следовательно, блокирование веб-фреймворков не будет эффективно масштабироваться при большом объеме запросов. Задержка в медленных рабочих потоках усугубляет ситуацию, поскольку для возврата рабочего потока в пул, готового обработать другой запрос, потребуется больше времени. В некоторых случаях такое расположение вполне приемлемо. Фактически, это во многом такой подход, как большинство веб-приложений разрабатывалось более десяти лет. Но времена меняются.

Клиенты этих веб-приложений изменились и не смену людям, иногда просматривающих веб-сайты, пришли люди, часто потребляющих контент и использующих приложения, которые координируются с HTTP API. И в наши дни так называемый Интернет вещей (где люди даже не участвуют) предоставляет автомобили, реактивные двигатели и другие нетрадиционные клиенты, постоянно обменивающиеся данными с веб-API. С увеличением числа клиентов, использующие веб-приложения, масштабируемость становится более важной, чем когда-либо.

Асинхронные веб-инфраструктуры, напротив, обеспечивают более высокую масштабируемость при меньшем количестве потоков - обычно по одному на ядро ЦП. Применяя метод, известный как цикл обработки событий (как показано на рисунке 11.1), эти платформы способны обрабатывать много запросов на поток, что делает затраты на соединение более экономичными.

Рисунок 11.1. Асинхронные веб-фреймворки применяют зацикливание событий для обработки большего количества запросов с меньшим количеством потоков.

В цикле событий все обрабатывается как событие, включая запросы и обратные вызовы от интенсивных операций, таких как операции с базой данных и сетью. Когда требуется дорогостоящая операция, цикл событий регистрирует обратный вызов для выполнения этой операции параллельно, а затем переходит к обработке других событий.

Когда операция завершена, она обрабатывается как событие в цикле событий, так же как и запросы. В результате асинхронные веб-инфраструктуры могут лучше масштабироваться при больших объемах запросов с меньшим количеством потоков, что приводит к снижению накладных расходов на управление потоками.

Spring 5 представил неблокирующую асинхронный веб-фреймворк, основанный в основном на своем Project Reactor, для удовлетворения потребностей в большей масштабируемости в веб-приложениях и API-интерфейсах. Давайте взглянем на Spring WebFlux - реактивный веб-фреймворк для Spring.

11.1.1 Описание Spring WebFlux

Когда команда Spring обдумывала, как добавить модель реактивного программирования в веб-слой, быстро стало очевидно, что это будет трудно сделать без большой работы в Spring MVC. Это будет включать в себя код ветвления, чтобы решить, реагировать ли на запросы реактивно или нет. По сути, результатом будут два веб-фреймворка, упакованные как один, с инструкциями if для разделения реактивных и нереактивных действий.

Вместо того чтобы пытаться встроить реактивную модель программирования в Spring MVC, было решено создать отдельный реактивный веб-фреймворк, заимствуя как можно больше от Spring MVC. Итогом стал Spring WebFlux. Рисунок 11.2 иллюстрирует полный стек веб-разработки, определенный Spring 5.

Рис. 11.2 Spring 5 поддерживает реактивные веб-приложения с новым веб-фрэймворком Web Flux, который является родственным Spring MVC и разделяет многие из ее основных компонентов.

В левой стороне рисунка 11.2 вы видите стек Spring MVC, который был представлен в версии 2.5 Spring Framework. Spring MVC (описанный в главах 2 и 6) расположен поверх API сервлетов Java, для которого требуется контейнер сервлета (например, Tomcat).

Spring WebFlux (с правой стороны) не имеет связей с API сервлета, поэтому он строится поверх реактивного HTTP API, который является реактивным приближением той же функциональности, предоставляемой API сервлета. И поскольку Spring WebFlux не связан с API сервлета, для его запуска не требуется контейнер сервлета. Вместо этого, он может работать на любом неблокирующем веб-контейнере, включая Netty, Undertow, Tomcat, Jetty, или любой контейнер Servlet 3.1 или выше.

На рис. 11.2 наиболее примечательным является верхний левый прямоугольник, представляющий компоненты, общие для Spring MVC и Spring WebFlux, в первую очередь аннотации, используемые для определения контроллеров. Поскольку Spring MVC и Spring WebFlux используют одни и те же аннотации, Spring WebFlux во многом неотличим от Spring MVC.

Прямоугольник в верхнем правом углу представляет альтернативную модель программирования, которая определяет контроллеры с функциональной парадигмой программирования вместо использования аннотаций. Более подробно о функциональной модели веб-программирования Spring мы расскажем в разделе 11.2.

Наиболее существенное различие между Spring MVC и Spring WebFlux сводится к тому, какую зависимость вы добавляете в свою сборку. При работе с Spring WebFlux вам необходимо добавить Spring Boot WebFlux стартер вместо стандартного веб стартера (например, spring-boot-starter-web). В файле проекта pom.xml это выглядит так:

org.springframework.boot

spring-boot-starter-webflux

ПРИМЕЧАНИЕ. Как и с большинством Spring Boot стартеров, этот стартер также можно добавить в проект, установив флажок Reactive Web в Initializr.

Интересным побочным эффектом использования WebFlux вместо Spring MVC является то, что встроенным сервером по умолчанию для WebFlux является Netty вместо Tomcat. Netty - один из нескольких асинхронных серверов, управляемых событиями, и он естественным образом подходит для реактивной веб-инфраструктуры, такой как Spring WebFlux.

Помимо использования другой стартер зависимости, методы контроллера Spring WebFlux обычно принимают и возвращают реактивные типы, такие как Mono и Flux, вместо типов доменов и коллекций. Контроллеры Spring WebFlux также могут работать с типами RxJava, такими как Observable, Single и Completable.

РЕАКТИВНЫЙ SPRING MVC?

Хотя Spring WebFlux контроллеры обычно возвращают Mono и Flux, это не означает, что Spring MVC не может работать с реактивными типами. Методы контроллера Spring MVC также могут возвращать Mono или Flux, если хотите.

Разница в том, как эти типы используются. В то время как Spring WebFlux представляет собой действительно реактивный веб-фрэйворк, позволяющую обрабатывать запросы в цикле обработки событий, Spring MVC основан на сервлетах, полагаясь на многопоточность для обработки нескольких запросов.

Давайте включим Spring WebFlux в работу, переписав некоторые из контроллеров API Taco Cloud, чтобы использовать преимущества Spring WebFlux.

11.1.2 Написание реактивных контроллеров

Возможно, вы помните, что в главе 6 вы создали несколько контроллеров для REST API Taco Cloud. Эти контроллеры имели методы обработки запросов, которые имели дело с вводом и выводом с точки зрения типов доменов (таких как Order и Taco) или наборов этих типов доменов. В качестве напоминания рассмотрим следующий фрагмент из DesignTacoController, который вы написали в главе 6:

@RestController

@RequestMapping(path="/design", produces="application/json")

@CrossOrigin(origins="*")

public class DesignTacoController {

...

@GetMapping("/recent")

public Iterable recentTacos() {

PageRequest page = PageRequest.of(

0, 12, Sort.by("createdAt").descending());

return tacoRepo.findAll(page).getContent();

}

...

}

Как уже было ранее написано, контроллер latestTacos() обрабатывает HTTP-запросы GET для /design/recent, чтобы вернуть список недавно созданных тако. Более конкретно, он возвращает Iterable с типом Taco. Это в первую очередь потому, что это то, что возвращается из метода findAll() репозитьтлоия, или, точнее, из метода getContent() объекта Page, возвращаемого из findAll().

Это прекрасно работает, но Iterable не является реактивным типом. Вы не сможете применить к нему какие-либо реактивные операции, и при этом вы не сможете позволить фрэймворку использовать его как реактивный тип для разделения любой работы на несколько потоков. То, что вы хотели бы, чтобы recentTacos() возвращали Flux .

Простой, но несколько ограниченный вариант здесь - переписать recentTacos() для преобразования Iterable во Flux. А заодно избавимся от кода паджинации и заменим его вызовом метода take() у Flux:

@GetMapping("/recent")

public Flux recentTacos() {

return Flux.fromIterable(tacoRepo.findAll()).take(12);

}

Используя Flux.fromIterable(), вы конвертируете Iterable в Flux. И теперь, когда вы работаете с Flux, вы можете использовать операцию take(), чтобы ограничить возвращаемый Flux максимум 12 объектами Taco. Код не только проще, он также имеет дело с реактивным Flux, а не простым Iterable.

До сих пор написание реактивного кода было выигрышным шагом. Но было бы еще лучше, если бы репозиторий давало вам Flux для начала, чтобы вам не нужно было выполнять преобразование. Если бы это было так, то recentTacos() можно было бы написать так:

@GetMapping("/recent")

public Flux recentTacos() {

return tacoRepo.findAll().take(12);

}

Это даже лучше! В идеале реактивный контроллер будет вершиной стека, который является реактивным от начала до конца, включая контроллеры, репозитории, базу данных и любые службы, которые могут находиться между ними. Такой сквозной реактивный стек показан на рис. 11.3.

Рис. 11.3. Чтобы максимизировать преимущества реактивного веб-фрэймворка, она должна быть частью полного сквозного реактивного стека.

Такой end-to-end стек требует, чтобы репозиторий был написан так, чтобы он возвращал Flux вместо Iterable. Мы рассмотрим написание реактивных репозиториев в следующей главе, но вот краткий обзор того, как может выглядеть реактивный TacoRepository:

public interface TacoRepository

extends ReactiveCrudRepository {

}

Однако наиболее важно отметить, что помимо работы с Flux вместо Iterable,, а также того, как вы получаете этот Flux, программная модель для определения реактивного контроллера WebFlux ничем не отличается от нереактивного контроллера Spring MVC. Оба аннотируются с помощью @RestController и высокоуровневого @RequestMapping на уровне класса. И оба имеют функции обработки запросов, которые аннотируются с помощью @GetMapping на уровне метода. Разница только в том, какой тип возвращают методы обработчика.

Еще одно важное замечание заключается в том, что, хотя вы получаете Flux из репозитория, вы можете вернуть его без вызова subscribe(). Действительно, фреймворк вызовет для вас метод subscribe(). Это означает, что при обработке запроса /design/recent будет вызван метод recentTacos(), который вернется до того, как данные будут даже получены из базы данных!

ВОЗВРАЩЕНИЕ ОДИНОЧНЫХ ЗНАЧЕНИЙ

В качестве другого примера рассмотрим метод tacoById() из DesignTacoController, как он был написан в главе 6:

@GetMapping("/{id}")

public Taco tacoById(@PathVariable("id") Long id) {

Optional optTaco = tacoRepo.findById(id);

if (optTaco.isPresent()) {

return optTaco.get();

}

return null;

}

Здесь этот метод обрабатывает запросы GET для /design/{id} и возвращает одиночный объект Taco. Поскольку findById() репозитория возвращает Optional, вам пришлось написать какой-то неуклюжий код, чтобы справиться с этим. Но предположим на минуту, что findById() возвращает Mono вместо Optional. В этом случае вы можете переписать tacoById(), чтобы выглядело следующим образом:

@GetMapping("/{id}")

public Mono tacoById(@PathVariable("id") Long id) {

return tacoRepo.findById(id);

}

Вау! Это намного проще. Однако более важно то, что, возвращая Mono вместо Taco, вы позволяете Spring WebFlux обрабатывать ответ реагирующим образом. Следовательно, ваш API будет лучше масштабироваться в ответ на большие нагрузки.

РАБОТА С ТИПАМИ RXJAVA

Стоит отметить, что хотя типы Reactor, такие как Flux и Mono, являются естественным выбором при работе с Spring WebFlux, вы также можете выбрать работу с типами RxJava, такими как Observable и Single. Например, предположим, что между DesignTacoController и внутренним репозиторием находится служба, которая работает в терминах типов RxJava. В этом случае метод recentTacos() может быть написан так:

@GetMapping("/recent")

public Observable recentTacos() {

return tacoService.getRecentTacos();

}

Аналогично, метод tacoById() может быть написан для работы с RxJava Single, а не Mono:

@GetMapping("/{id}")

public Single tacoById(@PathVariable("id") Long id) {

return tacoService.lookupTaco(id);

}

Кроме того, методы контроллера Spring WebFlux также могут возвращать RxJava Completable, который эквивалентен Mono в Reactor. WebFlux также может возвращать Flowable в качестве альтернативы Observable или Reactor Flux.

РЕАКТИВНАЯ ОБРАБОТКА ВХОДНЫХ ДАННЫХ

До сих пор мы интересовались только тем, какие реактивные типы возвращают методы контроллера. Но с Spring WebFlux вы также можете принять Mono или Flux в качестве входных данных для метода-обработчика. Для демонстрации рассмотрим оригинальную реализацию postTaco() из DesignTacoController:

@PostMapping(consumes="application/json")

@ResponseStatus(HttpStatus.CREATED)

public Taco postTaco(@RequestBody Taco taco) {

return tacoRepo.save(taco);

}

postTaco() не только возвращает простой объект Taco, но также принимает объект Taco, связанный с содержимым в теле запроса. Это означает, что postTaco() не может быть вызван до тех пор, пока полезная нагрузка запроса не будет полностью подготовлена для создания экземпляра объекта Taco. Это означает, что postTaco() не может быть возвращен, пока не завершится блокирующий вызов метода save() репозитория. Другими словами, запрос блокируется дважды: при поступлении в postTaco() и снова внутри postTaco(). Но, применив небольшое реактивное кодирование к postTaco(), вы можете сделать его полностью неблокирующим методом обработки запросов:

@PostMapping(consumes="application/json")

@ResponseStatus(HttpStatus.CREATED)

public Mono postTaco(@RequestBody Mono tacoMono) {

return tacoRepo.saveAll(tacoMono).next();

}

Здесь postTaco() принимает Mono и вызывает метод saveAll() хранилища, который, как вы увидите в следующей главе, принимает любую реализацию Reactive Streams Publisher, включая Mono или Flux. Метод saveAll() возвращает Flux, но поскольку вы работаете с Mono, вы знаете, что существует не более одного Taco, который будет опубликован Flux. Поэтому вы можете вызвать next(), чтобы получить Mono , который вернется из postTaco().

Принимая Mono в качестве входных данных, метод вызывается немедленно, не дожидаясь готовности Taco на основе тела запроса. И поскольку репозиторий также является реактивным, он примет Mono и немедленно вернет Flux, из которого вы вызываете next(), и вернете полученный Mono ... и все это еще до того, как запрос будет обработан!

Spring WebFlux является фантастической альтернативой Spring MVC, предлагая возможность написания реактивных веб-приложений с использованием той же модели разработки, что и Spring MVC. Но у Spring 5 есть еще одна новая хитрость. Давайте посмотрим, как создавать реагирующие API, используя новый функциональный стиль программирования Spring 5.

11.2 Определение функциональных обработчиков запросов

Модель программирования Spring MVC, основанная на аннотациях, существует начиная с Spring 2.5 и пользуется большой популярностью. Это имеет несколько недостатков.

Во-первых, любое программирование на основе аннотаций включает разделение в определении того, что аннотация должна делать и как она должна это делать. Сами аннотации определяют, что; а как это определено в другом месте в рамках кода. Это усложняет модель программирования, когда речь заходит о какой-либо настройке или расширении, поскольку такие изменения требуют работы в коде, внешнем по отношению к аннотации. Более того, отладка такого кода сложна, потому что вы не можете установить точку останова для аннотации.

Кроме того, поскольку популярность Spring продолжает расти, разработчики, впервые познакомившиеся с Spring в других языках и фреймворках, могут обнаружить что Spring MVC (и WebFlux) на основе аннотаций совсем не такой, как они уже его знают. В качестве альтернативы WebFlux Spring 5 представил новую модель функционального программирования для определения реактивных API.

Эта новая модель программирования используется больше как библиотека и меньше как фреймворк, позволяя сопоставлять запросы с кодом обработчика без аннотаций. Написание API с использованием модели функционального программирования Spring включает четыре основных типа:

-RequestPredicate — объявляет виды запросов, которые будут обработаны.

-RouterFunction - бъявляет, как соответствующий запрос должен быть направлен в код обработчика.

-ServerRequest - представляет собой HTTP-запрос, включая доступ к информации в header и body.

-ServerResponse - представляет ответ HTTP, включая информацию header и body

В качестве простого примера, который объединяет все эти типы, рассмотрим следующий пример Hello World:

package demo;

import static org.springframework.web.

reactive.function.server.RequestPredicates.GET;

import static org.springframework.web.

reactive.function.server.RouterFunctions.route;

import static org.springframework.web.

reactive.function.server.ServerResponse.ok;

import static reactor.core.publisher.Mono.just;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.reactive.function.server.RouterFunction;

@Configuration

public class RouterFunctionConfig {

@Bean

public RouterFunction helloRouterFunction() {

return route(GET("/hello"),

request -> ok().body(just("Hello World!"), String.class));

}

}

Первое, на что нужно обратить внимание, это то, что вы произвели статический импорт нескольких вспомогательных классов, которые вы можете использовать для создания вышеупомянутых функциональных типов. Вы также статически импортировали Mono, чтобы остальную часть кода было легче читать и понимать.

В этом классе аннотированном как @Configuration у вас есть один метод @Bean типа RouterFunction . Как уже упоминалось, RouterFunction объявляет сопоставления между одним или несколькими объектами RequestPredicate и функциями, которые будут обрабатывать соответствующие запрос(ы).

Метод route() из RouterFunctions принимает два параметра: RequestPredicate и функцию для обработки совпадающих запросов. В этом случае метод GET() из RequestPredicates объявляет RequestPredicate, который совпадает с HTTP-запросами GET для пути /hello.

Что касается функции-обработчика, она написана как лямбда, хотя она также может быть ссылкой на метод. Хотя это явно не объявлено, лямбда-обработчик принимает ServerRequest в качестве параметра. Он возвращает ServerResponse, используя ok() из ServerResponse и body() из BodyBuilder, который был возвращен из ok(). Это было сделано для того, чтобы создать ответ с кодом состояния HTTP 200 (ОК) и полезной нагрузкой body с надписью Hello World!

Метод helloRouterFunction() объявляет RouterFunction, которая обрабатывает только один вид запроса. Но если вам нужно обработать запрос другого типа, вам не нужно писать другой метод @Bean, хотя вы можете это сделать. Вам нужно только вызвать andRoute(), чтобы объявить другое сопоставление RequestPredicate-to-function. Например, вот как вы можете добавить другой обработчик для запросов GET для /bye:

@Bean

public RouterFunction helloRouterFunction() {

return route(GET("/hello"),

request -> ok().body(just("Hello World!"), String.class))

.andRoute(GET("/bye"),

request -> ok().body(just("See ya!"), String.class));

}

Hello World примеры отлично подходят для знакомства с чем-то новым. Но давайте немного расширим этот пример и посмотрим, как использовать функциональную модель веб-программирования Spring для обработки запросов, которые напоминают реальные сценарии.

Чтобы продемонстрировать, как функциональная модель программирования может использоваться в реальном приложении, давайте заново напишем функционал DesignTacoController в функциональном стиле. Следующий класс конфигурации является функциональным аналогом DesignTacoController:

@Configuration

public class RouterFunctionConfig {

@Autowired

private TacoRepository tacoRepo;

@Bean

public RouterFunction routerFunction() {

return route(GET("/design/taco"), this::recents)

.andRoute(POST("/design"), this::postTaco);

}

public Mono recents(ServerRequest request) {

return ServerResponse.ok()

.body(tacoRepo.findAll().take(12), Taco.class);

}

public Mono postTaco(ServerRequest request) {

Mono taco = request.bodyToMono(Taco.class);

Mono savedTaco = tacoRepo.save(taco);

return ServerResponse

.created(URI.create(

"http://localhost:8080/design/taco/" +

savedTaco.getId()))

.body(savedTaco, Taco.class);

}

}

Как вы можете видеть, метод routerFunction() объявляет bean RouterFunction как в примере Hello World. Но это зависит от того, какие типы запросов обрабатываются и как они обрабатываются. Но он отличается тем, какие типы запросов обрабатываются и как они обрабатываются. В этом случае функция маршрутизатора создается для обработки запросов GET для /design/taco и POST для /design.

Маршруты обрабатываются ссылками на методы. Лямбды хороши, когда поведение функции RouterFunction относительно простое и краткое. Во многих случаях, однако, лучше извлечь эту функциональность в отдельный метод (или даже в отдельный метод в отдельном классе), чтобы поддерживать читаемость кода.

Запросы GET на /design/taco будут обрабатываться методом recents(). Он использует внедренный TacoRepository для извлечения Mono, из которого он берет 12 элементов. А запросы POST для /design обрабатываются методом postTaco(), который извлекает Mono из входящего ServerRequest. Затем метод postTaco() использует TacoRepository, чтобы сохранить его, прежде чем вернуть Mono, который возвращающийся из метода save().

11.3 Тестирование реактивных контроллеров

Когда дело дойдет до тестирования реактивных контроллеров, Spring 5 не оставит нас в беде. Действительно, Spring 5 представил WebTestClient, новую утилиту для тестирования, которая упрощает написание тестов для реактивных контроллеров, написанных с использованием Spring WebFlux. Чтобы увидеть, как писать тесты с помощью WebTestClient, давайте используем его для тестирования метода recentTacos() из DesignTacoController, который вы написали в разделе 11.1.2.

11.3.1. Тестирование GET запросов

Одна вещь, которую мы хотели бы заявить о методе recentTacos(), заключается в том, что если для пути /design/recent выдается запрос HTTP GET, то ответ будет содержать полезную нагрузку JSON с не более чем 12 тако. Тестовый класс в следующем листинге - хорошее начало.

Листинг 11.1. Использование WebTestClient для тестирования DesignTacoController

package tacos;

import static org.mockito.Mockito.*;

import java.util.ArrayList;

import java.util.List;

import org.junit.Test;

import org.mockito.Mockito;

import org.springframework.http.MediaType;

import org.springframework.test.web.reactive.server.WebTestClient;

import reactor.core.publisher.Flux;

import tacos.Ingredient.Type;

import tacos.data.TacoRepository;

import tacos.web.api.DesignTacoController;

public class DesignTacoControllerTest {

@Test

public void shouldReturnRecentTacos() {

Taco[] tacos = { //Задание тестовых данных

testTaco(1L), testTaco(2L),

testTaco(3L), testTaco(4L),

testTaco(5L), testTaco(6L),

testTaco(7L), testTaco(8L),

testTaco(9L), testTaco(10L),

testTaco(11L), testTaco(12L),

testTaco(13L), testTaco(14L),

testTaco(15L), testTaco(16L)};

Flux tacoFlux = Flux.just(tacos);

TacoRepository tacoRepo = Mockito.mock(TacoRepository.class);

when(tacoRepo.findAll()).thenReturn(tacoFlux); //Mocks TacoRepository

WebTestClient testClient = WebTestClient.bindToController(

new DesignTacoController(tacoRepo))

.build(); //Создание WebTestClient

testClient.get().uri("/design/recent")

.exchange() //Запрашивает последние тако

.expectStatus().isOk() //Проверяет ожидаемый ответ

.expectBody()

.jsonPath("$").isArray()

.jsonPath("$").isNotEmpty()

.jsonPath("$[0].id").isEqualTo(tacos[0].getId().toString())

.jsonPath("$[0].name").isEqualTo("Taco 1").jsonPath("$[1].id")

.isEqualTo(tacos[1].getId().toString()).jsonPath("$[1].name")

.isEqualTo("Taco 2").jsonPath("$[11].id")

.isEqualTo(tacos[11].getId().toString())

...

.jsonPath("$[11].name").isEqualTo("Taco 12").jsonPath("$[12]")

.doesNotExist();

.jsonPath("$[12]").doesNotExist();

}

...

}

Первое, что должен сделать метод shouldReturnRecentTacos(), - это установить тестовые данные в форме Flux. Этот Flux затем предоставляется как возвращаемое значение из метода findAll() фиктивного (mock) TacoRepository.

Что касается Taco объектов, которые будут опубликованы Flux, они создаются с помощью служебного метода testTaco(), который при присвоении номера создает объект Taco, ID и имя которого основаны на этом числе. Метод testTaco() реализован следующим образом:

private Taco testTaco(Long number) {

Taco taco = new Taco();

taco.setId(UUID.randomUUID());

taco.setName("Taco " + number);

List ingredients = new ArrayList<>();

ingredients.add(

new IngredientUDT("INGA", "Ingredient A", Type.WRAP));

ingredients.add(

new IngredientUDT("INGB", "Ingredient B", Type.PROTEIN));

taco.setIngredients(ingredients);

return taco;

}

Для простоты все тестовые тако будут иметь одинаковые два ингредиента. Но их ID и имя будут определяться по указанному номеру.

Между тем, вернувшись в метод shouldReturnRecentTacos(), вы создали экземпляр DesignTacoController, внедрив в конструктор фиктивный TacoRepository. Контроллер передается в WebTestClient.bindToController() для создания экземпляра WebTestClient.

Завершив настройку, вы теперь готовы использовать WebTestClient для отправки запроса GET в /design/recent и проверки того, что ответ соответствует вашим ожиданиям. Вызов get().uri("/design/recent") описывает запрос, который вы хотите выполнить. Затем вызов метода exchange() отправляет запрос, который будет обработан контроллером, с которым связан WebTestClient - DesignTacoController.

Наконец, вы можете подтвердить, что ответ соответствует ожиданиям. Вызывая waitStatus(), вы утверждаете, что ответ имеет код состояния HTTP 200 (OK). После этого вы видите несколько вызовов jsonPath(), которые задают условия, что JSON в теле ответа имеет значения, которые он должен иметь. Последнее условие проверяет, что 12-й элемент (в массиве, начинающемся с нуля) не существует, поскольку в результате никогда не должно быть более 12 элементов.

Если JSON возвращается сложным, с большим количеством данных или сильно вложенными данными, это может быть утомительно использовать jsonPath(). Фактически, я пропустил многие вызовы jsonPath() в листинге 11.1, чтобы сэкономить место. В тех случаях, когда использование функции jsonPath() может быть неудобным, WebTestClient предлагает функцию json(), которая принимает параметр String, содержащий JSON, для сравнения ответа с ним.

Например, предположим, что вы создали полный ответ JSON в файле с именем recent-tacos.json и поместили его в путь к классам с путем /tacos. Затем вы можете переписать условия WebTestClient, чтобы они выглядели так:

ClassPathResource recentsResource =

new ClassPathResource("/tacos/recent-tacos.json");

String recentsJson = StreamUtils.copyToString(

recentsResource.getInputStream(), Charset.defaultCharset());

testClient.get().uri("/design/recent")

.accept(MediaType.APPLICATION_JSON)

.exchange()

.expectStatus().isOk()

.expectBody()

.json(recentsJson);

Поскольку json() принимает String, вы должны сначала загрузить ресурс classpath в String. К счастью, Spring StreamUtils делает это играючи с copyToString(). String, возвращаемый функцией copyToString(), будет содержать весь JSON, который вы ожидаете получить в ответе на ваш запрос. Если передать его методу json(), контроллер получит правильный вывод.

Другой вариант, предлагаемый WebTestClient, позволяет сравнивать тело ответа со списком значений. МетодwellBodyList() принимает либо Class, либо ParameterizedTypeReference, указывающий тип элементов в списке, и возвращает объект ListBodySpec, для которого можно делать утверждения. Используя expectBodyList(), вы можете переписать тест, чтобы использовать подмножество тех же самых тестовых данных, которые вы использовали для создания моковского TacoRepository:

testClient.get().uri("/design/recent")

.accept(MediaType.APPLICATION_JSON)

.exchange()

.expectStatus().isOk()

.expectBodyList(Taco.class)

.contains(Arrays.copyOf(tacos, 12));

Здесь вы утверждаете, что тело ответа содержит список, содержащий те же элементы, что и первые 12 элементов исходного массива Taco, созданного вами в начале тестового метода.

11.3.2 Тестирование POST-запросов

WebTestClient может делать больше, чем просто проверять GET запросы в контроллерах. Его также можно использовать для тестирования любого метода HTTP, включая запросы GET, POST, PUT, PATCH, DELETE и HEAD. Таблица 11.1 отображает методы HTTP на методы WebTestClient.

Таблица 11.1. WebTestClient тестирует любые запросы к контроллерам Spring WebFlux.

HTTP методы : WebTestClient метод

GET : .get()

POST : .post()

PUT : .put()

PATCH : .patch()

DELETE : .delete()

HEAD : .head()

В качестве примера тестирования еще один метод тестирования HTTP-запроса к контроллеру Spring WebFlux, давайте посмотрим на другой тест с DesignTacoController. На этот раз вы напишете тест конечной точки создания taco вашего API, отправив POST запрос в /design:

@Test

public void shouldSaveATaco() {

TacoRepository tacoRepo = Mockito.mock(

TacoRepository.class); //Устанавливает тестовые данные

Mono unsavedTacoMono = Mono.just(testTaco(null));

Taco savedTaco = testTaco(null);

savedTaco.setId(1L);

Mono savedTacoMono = Mono.just(savedTaco);

when(tacoRepo.save(any())).thenReturn(savedTacoMono); //Mocks TacoRepository

WebTestClient testClient = WebTestClient.bindToController( //Создание WebTestClient

new DesignTacoController(tacoRepo)).build();

testClient.post() //POST для тако

.uri("/design")

.contentType(MediaType.APPLICATION_JSON)

.body(unsavedTacoMono, Taco.class)

.exchange()

.expectStatus().isCreated() //Проверяет ответ

.expectBody(Taco.class)

.isEqualTo(savedTaco);

}

Как и в предыдущем методе тестирования, shouldSaveATaco начинается с настройки некоторых тестовых данных, моккинга TacoRepository, и создания WebTestClient, который привязан к контроллеру. Затем он использует WebTestClient для отправки POST запроса в /design с телом типа application/json и полезной нагрузкой, которая является JSON-сериализованной формой Taco в несохраненном Mono. После выполнения exchange() тест утверждает, что ответ имеет статус HTTP 201 (CREATED) и полезную нагрузку в теле, равную сохраненному объекту Taco.

11.3.3. Тестирование на живом сервере

Тесты, которые вы написали до сих пор, основывались на моковской реализации среды Spring WebFlux, так что реальный сервер не понадобился бы. Но вам может потребоваться протестировать контроллер WebFlux в контексте сервера, такого как Netty или Tomcat, и, возможно, с хранилищем или другими зависимостями. То есть вы должны быть способны написать интеграционный тест.

Чтобы написать интеграционный тест WebTestClient, начните с аннотирования класса теста с помощью @RunWith и @SpringBootTest, как и любого другого интеграционного теста Spring Boot:

@RunWith(SpringRunner.class)

@SpringBootTest(webEnvironment=WebEnvironment.RANDOM_PORT)

public class DesignTacoControllerWebTest {

@Autowired

private WebTestClient testClient;

}

Установив для атрибута webEnvironment значение WebEnvironment.RANDOM_PORT, вы просите Spring запустить работающий сервер, прослушивающий случайным образом выбранный порт (Вы могли бы также установить для webEnvironment значение WebEnvironment.DEFINED_PORT и указать порт с атрибутом properties, но это обычно нежелательно. Это открывает риск столкновения портов с работающим сервером).

Вы, думаю, заметите, что вы также автоматически подключили WebTestClient к тестовому классу. Это означает не только то, что вам больше не нужно создавать его в своих методах тестирования, но и то, что вам не нужно указывать полный URL-адрес при отправке запросов. Это потому, что WebTestClient будет настроен, чтобы узнать, на каком порту работает тестовый сервер. Теперь вы можете переписать shouldReturnRecentTacos() в качестве интеграционного теста, в котором используется автоматическая привязка WebTestClient:

@Test

public void shouldReturnRecentTacos() throws IOException {

testClient.get().uri("/design/recent")

.accept(MediaType.APPLICATION_JSON).exchange()

.expectStatus().isOk()

.expectBody()

.jsonPath("$[?(@.id == 'TACO1')].name")

.isEqualTo("Carnivore")

.jsonPath("$[?(@.id == 'TACO2')].name")

.isEqualTo("Bovine Bounty")

.jsonPath("$[?(@.id == 'TACO3')].name")

.isEqualTo("Veg-Out");

}

Вы, несомненно, заметили, что в этой новой версии shouldReturnRecentTacos() кода гораздо меньше. Больше нет необходимости создавать WebTestClient, потому что вы будете использовать экземпляр с автосвязыванием. И нет необходимости издеваться над TacoRepository, потому что Spring создаст экземпляр DesignTacoController и внедрит его в настоящий TacoRepository. В этой новой версии метода теста вы используете выражения JSONPath для проверки значений, передаваемых из базы данных.

WebTestClient полезен, когда в ходе теста вам нужно использовать API, предоставляемый контроллером WebFlux. Но что делать, когда само ваше приложение использует какой-то другой API? Давайте обратим наше внимание на клиентскую сторону реактивной веб-истории Spring и посмотрим, как WebClient предоставляет REST-клиент, который работает с реактивными типами, такими как Mono и Flux.

11.4 Использование реактивного API REST

В главе 7 вы использовали RestTemplate для отправки клиентских запросов в Taco Cloud API. RestTemplate является старым таймером, появившимся в Spring версии 3.0. В свое время он использовался для бесчисленных запросов от имени приложений, которые его используют.

Но все методы, предоставляемые RestTemplate, работают с нереактивными типами доменов и коллекциями. Это означает, что если вы хотите работать с данными ответа реактивным способом, вам нужно обернуть их в Flux или Mono. И если у вас уже есть Flux или Mono, и вы хотите отправить их в запросе POST или PUT, вам нужно будет извлечь данные в нереактивный тип, прежде чем делать запрос.

Было бы хорошо, если бы был способ использовать RestTemplate изначально с реактивными типами. Не бойтесь. Spring 5 предлагает WebClient в качестве реактивной альтернативы RestTemplate. WebClient позволяет отправлять и получать реактивные типы при отправке запросов на внешние API.

Использование WebClient сильно отличается от использования RestTemplate. Вместо того, чтобы иметь несколько методов для обработки различных типов запросов, WebClient имеет свободный интерфейс в стиле конструктора, который позволяет вам описывать и отправлять запросы. Общий шаблон использования для работы с WebClient:

-Создать экземпляр WebClient (или внедрить bean-компонент WebClient)

-Укажите HTTP метод запроса на отправку

-Укажите URI и любые заголовки, которые должны быть в запросе

-Отправить request

-Получить response

Давайте рассмотрим несколько примеров работы WebClient, начиная с того, как использовать WebClient для отправки HTTP-запросов GET.

11.4.1 GET ресурсов

В качестве примера использования WebClient, предположим, что вам нужно извлечь объект Ingredient по его идентификатору из Taco Cloud API. Используя RestTemplate, вы можете использовать метод getForObject(). Но с WebClient вы создаете запрос, получаете ответ, а затем извлекаете Mono, который публикует объект Ingredient:

Mono ingredient = WebClient.create()

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.retrieve()

.bodyToMono(Ingredient.class);

ingredient.subscribe(i -> { ... })

Здесь вы создаете новый экземпляр WebClient с помощью create(). Затем вы используете get() и uri(), чтобы определить запрос GET для http://localhost:8080/ingredients/{id}, где заполнитель {id} будет заменен значением ingredientId.. Метод retrieve() выполняет запрос. Наконец, вызов bodyToMono() извлекает полезную нагрузку ответа в Mono, к которому вы можете продолжить применять дополнительные операции Mono.

Чтобы применить дополнительные операции к Mono, возвращенному из bodyToMono(), важно подписаться на него еще до того, как запрос будет отправлен. Делать запросы, которые могут вернуть коллекцию значений, так же просто. Например, следующий фрагмент кода выбирает все ингредиенты:

Flux ingredients = WebClient.create()

.get()

.uri("http://localhost:8080/ingredients")

.retrieve()

.bodyToFlux(Ingredient.class);

ingredients.subscribe(i -> { ... })

В большинстве случаев выборка нескольких элементов аналогична отправке запроса на один элемент. Большая разница в том, что вместо того, чтобы использовать bodyToMono() для извлечения тела ответа в Mono, вы используете bodyToFlux() для извлечения его во Flux.

Как и в случае с bodyToMono(), Flux, возвращенный из bodyToFlux(), еще не подписан. Это позволяет применять дополнительные операции (фильтры, map-ы и т. д.) к Flux до того, как данные начнут проходить через него. Поэтому важно подписаться на получившийся Flux, иначе запрос даже не будет отправлен.

ВЫПОЛНЕНИЕ ЗАПРОСОВ С БАЗОВЫМ URI

Вы можете использовать общий базовый URI для множества разных запросов. В этом случае может быть полезно создать bean-компонент WebClient с базовым URI и внедрить его везде, где это необходимо. Такой bean может быть объявлен так:

@Bean

public WebClient webClient() {

return WebClient.create("http://localhost:8080");

}

Затем в любом месте, где вам нужно сделать запросы с использованием этого базового URI, WebClient компонент может быть внедрен и использован следующим образом:

@Autowired

WebClient webClient;

public Mono getIngredientById(String ingredientId) {

Mono ingredient = webClient

.get()

.uri("/ingredients/{id}", ingredientId)

.retrieve()

.bodyToMono(Ingredient.class);

ingredient.subscribe(i -> { ... })

}

Поскольку WebClient уже был создан, вы можете сразу приступить к работе, вызвав get(). Что касается URI, то при вызове uri() необходимо указать только путь относительно базового URI.

ТАЙМ-АУТ ДЛЯ ДЛИТЕЛЬНЫХ ЗАПРОСОВ

Единственное, на что вы можете рассчитывать, - это то, что сети не всегда надежны или так быстры, как вы ожидаете. Или, может быть, удаленный сервер вяло обрабатывает запрос. В идеале, запрос к удаленной службе будет возвращен в разумные сроки. Но если нет, было бы здорово, если бы клиент не замер в ожидании ответа слишком долго.

Чтобы избежать задержки клиентских запросов медленной сетью или службой, можно использовать метод timeout() из Flux или Mono, чтобы ограничить время ожидания публикации данных. В качестве примера рассмотрим, как можно использовать функцию timeout() при извлечении данных ингредиента:

Flux ingredients = WebClient.create()

.get()

.uri("http://localhost:8080/ingredients")

.retrieve()

.bodyToFlux(Ingredient.class);

ingredients

.timeout(Duration.ofSeconds(1))

.subscribe(

i -> { ... },

e -> {

// ошибка тайм-аута обработки

})

Как вы можете видеть, прежде чем подписаться на Flux, вы вызвали timeout(), указав продолжительность 1 секунда. Если запрос может быть выполнен менее чем за 1 секунду, то нет никаких проблем. Но если запрос занимает больше 1 секунды, он истекает и вызывается обработчик ошибок, указанный в качестве второго параметра для subscribe().

11.4.2 Отправка ресурсов

Отправка данных с помощью WebClient не сильно отличается от получения данных. В качестве примера предположим, что у вас есть Mono и вы хотите отправить запрос POST с Ingredient, опубликованным Mono, в URI с относительным путем /ingredients. Все, что вам нужно сделать, это использовать метод post() вместо get() и указать, что Mono будет использоваться для заполнения тела запроса путем вызова body():

Mono ingredientMono = ...;

Mono result = webClient

.post()

.uri("/ingredients")

.body(ingredientMono, Ingredient.class)

.retrieve()

.bodyToMono(Ingredient.class);

result.subscribe(i -> { ... })

Если у вас нет Mono или Flux для отправки, но вместо этого есть объект домена, вы можете использовать syncBody(). Например, предположим, что вместо Mono у вас есть Ingredient, который вы хотите отправить в теле запроса:

Ingedient ingredient = ...;

Mono result = webClient

.post()

.uri("/ingredients")

.syncBody(ingredient)

.retrieve()

.bodyToMono(Ingredient.class);

result.subscribe(i -> { ... })

Если вместо запроса POST вы хотите обновить Ingredient с помощью запроса PUT, вы вызываете put() вместо post() и соответственно корректируете путь URI:

Mono result = webClient

.put()

.uri("/ingredients/{id}", ingredient.getId())

.syncBody(ingredient)

.retrieve()

.bodyToMono(Void.class)

.subscribe();

Запросы PUT обычно имеют пустые полезные нагрузки ответа, поэтому вы должны указать bodyToMono() вернуть Mono типа Void. При подписке на этот Mono, запрос будет отправлен.

11.4.3 Удаление ресурсов

WebClient также позволяет удалять ресурсы с помощью метода delete(). Например, следующий код удаляет ингредиент для переданого ID:

Mono result = webClient

.delete()

.uri("/ingredients/{id}", ingredientId)

.retrieve()

.bodyToMono(Void.class)

.subscribe();

Как и в случае PUT запросов, запросы DELETE обычно не имеют полезной нагрузки. Вы снова возвращаетесь и подписываетесь на Mono, чтобы отправить запрос.

11.4.4 Обработка ошибок

До сих пор все примеры WebClient предполагали удачное завершение; не было ответов с кодами состояния 400 или 500. Если будет возвращен любой из статусов ошибок, WebClient зарегистрирует ошибку; в противном случае, он будет молча игнорировать это.

Если вам нужно обработать такие ошибки, то вызов onStatus() можно использовать для указания того, как должны обрабатываться различные коды состояния HTTP. onStatus() принимает две функции: функцию предиката, которая используется для соответствия статусу HTTP, и функцию, которая, учитывая объект ClientResponse, возвращает Mono.

Чтобы продемонстрировать, как onStatus() может быть использован для создания пользовательского обработчика ошибок, рассмотрим следующее использование WebClient, целью которого является извлечение ингредиента с учетом его идентификатора:

Mono ingredientMono = webClient

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.retrieve()

.bodyToMono(Ingredient.class);

До тех пор значение в идентификаторе ингредиента соответствует известному ресурсу ингредиента, то результирующий Mono опубликует объект ингредиента, когда он будет подписан. Но что произойдет, если не будет подходящего ингредиента?

При подписке на Mono или Flux, который может закончиться ошибкой, важно зарегистрировать получателя ошибок, а также получателя данных в вызове метода subscribe():

ingredientMono.subscribe(

ingredient -> {

// обрабатывать данные ингредиента

...

},

error-> {

// разобраться с ошибкой

...

});

Если ресурс ингредиента найден, то первая лямбда (потребитель данных), предоставленная subscribe(), вызывается с соответствующим объектом ингредиента. Но если он не найден, то запрос отвечает кодом состояния HTTP 404 (не найден), что приводит к тому, что вторая лямбда (потребитель ошибок) по умолчанию получает исключение WebClientResponseException.

Самая большая проблема с WebClientResponseException заключается в том, что он довольно неспецифичен относительно того, что могло пойти не так, чтобы вызвать сбой Mono. Его название предполагает, что была ошибка в ответе на запрос, сделанный WebClient, но вам нужно изучить исключение WebClientResponseException, чтобы узнать, что пошло не так. И в любом случае было бы неплохо, если бы исключение, предоставленное потребителю ошибок, было более специфичным для домена, а не для WebClient.

Добавляя пользовательский обработчик ошибок, вы можете предоставить код, который преобразует код состояния в Throwable по вашему выбору. Предположим, что вы хотите, чтобы неудачный запрос для ресурса ингредиента привел к тому, что Mono завершилось ошибкой с UnknownIngredientException. Вы можете добавить вызов onStatus() после вызова retrieve(), чтобы добиться этого:

Mono ingredientMono = webClient

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.retrieve()

.onStatus(HttpStatus::is4xxClientError,

response -> Mono.just(new UnknownIngredientException()))

.bodyToMono(Ingredient.class);

Первый аргумент в вызове onStatus() - это предикат, задающий состояние Http и возвращающий значение true, если требуется обработать код состояния. И если код состояния совпадает, то ответ будет возвращен функции во втором аргументе для обработки, как он считает нужным, в конечном итоге возвращая Mono типа Throwable.

В этом примере, если код состояния представляет собой код состояния уровня 400 (например, ошибка клиента), Mono будет возвращен с UnknownIngredientException. Это приводит к тому, что ingredientMono терпит неудачу с этим исключением.

Обратите внимание, что HttpStatus::is4xxClientError - это ссылка на метод is4xxClientError метода HttpStatus. Именно этот метод будет вызван для данного HttpStatus объекта. Если вы хотите, вы можете использовать другой метод в HttpStatus в качестве ссылки на метод; или вы можете предоставить свою собственную функцию в виде ссылки на лямбду или метод, которая возвращает boolean.

Например, вы можете получить еще более точную обработку ошибок, специально проверив состояние HTTP 404 (НЕ НАЙДЕНО), изменив вызов onStatus(), чтобы он выглядел следующим образом:

Mono ingredientMono = webClient

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.retrieve()

.onStatus(status -> status == HttpStatus.NOT_FOUND,

response -> Mono.just(new UnknownIngredientException()))

.bodyToMono(Ingredient.class);

Стоит также отметить, что вы можете иметь столько вызовов onStatus(), сколько вам нужно для обработки любых кодов состояния HTTP, которые могут возвращаться в ответе.

11.4.5 Обмен запросами

До этого момента вы использовали метод retrieve() для обозначения отправки запроса при работе с WebClient. В этих случаях метод retrieve() возвращает объект типа ResponseSpec, с помощью которого можно обрабатывать ответ с вызовами таких методов, как onStatus(), bodyToFlux() и bodyToMono(). Работа со спецификацией ответа хороша для простых случаев, но она несколько ограничена. Например, если вам нужен доступ к заголовкам ответа или значениям cookie, ResponseSpec вам не подойдет.

Когда ResponseSpec заканчивается, вы можете попробовать вызвать exchange() вместо retrieve(). Метод exchange() возвращает Mono типа ClientResponse, к которому можно применить реактивные операции для проверки и использования данных из всего ответа, включая полезную нагрузку, заголовки и файлы cookie.

Прежде чем мы рассмотрим, что отличает exchange() от retrieve(), давайте начнем с того, насколько они похожи. Следующий фрагмент кода использует WebClient и exchange() для извлечения одного ингредиента по идентификатору:

Mono ingredientMono = webClient

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.exchange()

.flatMap(cr -> cr.bodyToMono(Ingredient.class));

Это примерно эквивалентно следующему примеру, который использует retrieve():

Mono ingredientMono = webClient

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.retrieve()

.bodyToMono(Ingredient.class);

В примере exchange() вместо использования ResponseSpec объекта bodyToMono() для получения Mono вы получаете Mono, к которому можно применить функцию плоского отображения для сопоставления ClientResponse с Mono, который сглаживается в результирующий Mono.

Теперь давайте посмотрим, что отличает exchange(). Предположим, что ответ на запрос может включать заголовок с именем X_UNAVAILABLE со значением true, указывающим на то, что (по какой-то причине) рассматриваемый ингредиент недоступен. И для обсуждения, предположим, что если этот заголовок существует, вы хотите, чтобы результирующий Mono был пустым — ничего не возвращать. Вы можете достичь этого сценария, добавив еще один вызов flatMap() таким образом, чтобы весь вызов веб-клиента выглядел следующим образом:

Mono ingredientMono = webClient

.get()

.uri("http://localhost:8080/ingredients/{id}", ingredientId)

.exchange()

.flatMap(cr -> {

if (cr.headers().header("X_UNAVAILABLE").contains("true")) {

return Mono.empty();

}

return Mono.just(cr);

})

.flatMap(cr -> cr.bodyToMono(Ingredient.class));

Новый вызов flatMap() проверяет заголовки данного объекта запроса клиента, ища заголовок с именем X_UNAVAILABLE со значением true. Если он найден, он возвращает пустое Mono. В противном случае он возвращает новое Mono, содержащее ответ клиента. В любом случае, возвращенный Mono будет сглажен в Mono, с которым будет работать следующий вызов flatMap().

11.5 Securing reactive web APIs

Пока существует Spring Security (и даже до этого, когда он был известен как Acegi Security), его модель веб-безопасности была построена вокруг фильтров сервлетов. Ну, это имеет смысл. Если вам нужно перехватить запрос, привязанный к веб framework-у на основе сервлета, чтобы гарантировать, что у инициатора запроса есть надлежащие полномочия, фильтр сервлета является очевидным выбором. Но But Spring WebFlux вносит изменения в этот подход.

При написании веб-приложения с использованием Spring WebFlux нет никакой гарантии, что сервлеты будут задействованы. Фактически, реактивное веб-приложение, скорее всего, будет построено на Netty или каком-либо другом сервере, не являющемся сервлетом. Означает ли это, что Spring Security на основе фильтра сервлетов нельзя использовать для защиты приложений Spring WebFlux?

Это правда, что с помощью сервлетов, фильтров не подойдет при защите приложения Spring WebFlux. Но Spring Security все еще справляется с этой задачей. Начиная с версии 5.0.0, Spring Security можно использовать для защиты Spring MVC на основе сервлетов и реактивных приложений Spring WebFlux. Для этого используется Spring WebFilter, Spring-специфичный фильтр сервлетов, который не требует зависимости от API сервлета.

Что еще более примечательно, так это то, что модель конфигурации для реактивного Spring Security не сильно отличается от того, что вы видели в главе 4. Фактически, в отличие от Spring WebFlux, который имеет отдельную зависимость от Spring MVC, Spring Security является тот же стартер безопасности Spring Boot, независимо от того, собираетесь ли вы использовать его для защиты веб-приложения Spring MVC или приложения, написанного с использованием Spring WebFlux. Как напоминание, вот как выглядит security стартер:

org.springframework.boot

spring-boot-starter-security

Тем не менее, есть несколько небольших различий между моделями реактивной и нереактивной конфигурации Spring Security. Стоит взглянуть на сравнение двух моделей конфигурации.

11.5.1 Настройка реактивного web security

Напомним, что настройка Spring Security для защиты веб-приложения Spring MVC обычно включает в себя создание нового класса конфигурации, расширяющего WebSecurityConfigurerAdapter и снабженного аннотацией @EnableWebSecurity. Такой класс конфигурации переопределяет метод configuration() для указания специфики web security, например, какие полномочия требуются для определенных путей запроса. Следующий простой класс конфигурации Spring Security служит напоминанием о том, как настроить безопасность для нереактивного приложения Spring MVC:

@Configuration

@EnableWebSecurity

public class SecurityConfig extends WebSecurityConfigurerAdapter {

@Override

protected void configure(HttpSecurity http) throws Exception {

http

.authorizeRequests()

.antMatchers("/design", "/orders").hasAuthority("USER")

.antMatchers("/**").permitAll();

}

}

Теперь давайте посмотрим, как эта же конфигурация может выглядеть для реактивного приложения Spring WebFlux. В следующем списке показан класс конфигурации реактивной безопасности, который примерно эквивалентен простой конфигурации безопасности из предыдущих версий.

Листинг 11.2. Настройка Spring Security для приложения Spring WebFlux

@Configuration

@EnableWebFluxSecurity

public class SecurityConfig {

@Bean

public SecurityWebFilterChain securityWebFilterChain(

ServerHttpSecurity http) {

return http

.authorizeExchange()

.pathMatchers("/design", "/orders").hasAuthority("USER")

.anyExchange().permitAll()

.and()

.build();

}

}

Как вы можете видеть, есть много того, что знакомо, но в то же время отличающегося. Вместо @EnableWebSecurity этот новый класс конфигурации аннотируется с помощью @EnableWebFluxSecurity. Более того, класс конфигурации не расширяет WebSecurityConfigurerAdapter или любой другой базовый класс вообще. Поэтому он также не переопределяет методы configure().

Вместо метода configure() объявляется компонент типа SecurityWebFilterChain с помощью метода securityWebFilterChain(). Тело securityWebFilterChain() не сильно отличается от предыдущих конфигураций метода configure(), но есть некоторые тонкие изменения.

Прежде всего, конфигурация объявляется с использованием заданного объекта ServerHttpSecurity вместо объекта HttpSecurity. Используя данный ServerHttpSecurity, вы можете вызвать authorizeExchange(), который примерно эквивалентен authorizeRequests(), чтобы задать безопасность на уровне запросов.

ПРИМЕЧАНИЕ ServerHttpSecurity является новинкой в Spring Security 5 и является реактивным аналогом HttpSecurity.

При сопоставлении путей вы все равно можете использовать подстановочные пути в стиле Ant, но делайте это с помощью метода pathMatchers() вместо antMatchers(). И для удобства вам больше не нужно указывать универсальный путь в Ant-стиле для /**, потому что anyExchange() возвращает все, что вам нужно.

Наконец, поскольку вы объявляете SecurityWebFilterChain как bean, а не переопределяете метод фрэймворка, вы должны вызвать метод build(), чтобы собрать все правила безопасности в возвращаемом SecurityWebFilterChain.

Помимо этих небольших различий, настройка веб-безопасности ничем не отличается для Spring WebFlux от Spring MVC. Но как насчет пользовательских данных?

11.5.2 Конфигурирование службы реактивных данных пользователя

Расширяя WebSecurityConfigurerAdapter, вы переопределяете один метод configure() для объявления правил веб-безопасности и другой метод configure() для настройки логики аутентификации, обычно путем определения объекта UserDetails. В качестве напоминания о том, как это выглядит, рассмотрим следующий переопределенный метод configure(), который использует внедренный объект UserRepository в анонимной реализации UserDetailsService для поиска пользователя по имени пользователя:

@Autowired

UserRepository userRepo;

@Override

protected void

configure(AuthenticationManagerBuilder auth)

throws Exception {

auth

.userDetailsService(new UserDetailsService() {

@Override

public UserDetails loadUserByUsername(String username)

throws UsernameNotFoundException {

User user = userRepo.findByUsername(username)

if (user == null) {

throw new UsernameNotFoundException(

username " + not found")

}

return user.toUserDetails();

}

});

}

В этой нереактивной конфигурации вы переопределяете единственный метод, требуемый UserDetailsService, loadUserByUsername(). Внутри этого метода вы используете внедренный UserRepository для поиска пользователя по заданному имени пользователя. Если имя не найдено, вы бросаете исключение UsernameNotFoundException. Но если он найден, то вы вызываете вспомогательный метод toUserDetails() для возврата результирующего объекта UserDetails.

В реактивной конфигурации безопасности вы не переопределяете метод configure(). Вместо этого вы объявляете bean-компонент ReactiveUserDetailsService. ReactiveUserDetailsService является реактивным эквивалентом UserDetailsService. Как и UserDetailsService, ReactiveUserDetailsService требует реализации только одного метода. В частности, метод findByUsername() возвращает Mono вместо необработанного объекта UserDetails.

В следующем примере объявляется, что bean-компонент ReactiveUserDetailsService использует внедренный UserRepository, который предположительно является реактивным Spring Data репозиторием (о котором мы поговорим подробнее в следующей главе):

@Service

public ReactiveUserDetailsService userDetailsService(

UserRepository userRepo) {

return new ReactiveUserDetailsService() {

@Override

public Mono findByUsername(String username) {

return userRepo.findByUsername(username)

.map(user -> {

return user.toUserDetails();

});

}

};

}

Здесь Mono возвращается по мере необходимости, но метод UserRepository.findByUsername() возвращает Mono. Поскольку это Mono, вы можете вызвать операции, такие как map(), чтобы преобразовать Mono в Mono.

В этом случае операция map() применяется с лямбда-выражением, которое вызывает вспомогательный метод toUserDetails() для объекта User, опубликованного Mono. Это преобразует User в UserDetails. Как следствие, операция .map() возвращает Mono, который является именно тем, что требуется для ReactiveUserDetailsService.findByUsername().

ИТОГО:

- Spring WebFlux предлагает реактивный веб-фреймворк, модель программирования которого соответствует модели Spring MVC, даже разделяя многие из тех же самых аннотаций.

- Spring 5 также предлагает функциональную модель программирования в качестве альтернативы Spring WebFlux.

- Реактивные контроллеры можно протестировать с помощью WebTestClient.

- На стороне клиента Spring 5 предлагает WebClient, реактивный аналог Spring RestTemplate.

- Хотя WebFlux имеет некоторые существенные изменения для базовых механизмов защиты веб-приложений, Spring Security 5 поддерживает реактивную безопасность с помощью модели программирования, которая не сильно отличается от нереактивных приложений Spring MVC.

Загрузка...