Reactive Programming with Reactor 3
Open Source Your Knowledge, Become a Contributor
Technology knowledge has to be shared and made accessible for free. Join the movement.
Flux
Description
A Flux<T>
is a Reactive Streams Publisher
, augmented with a lot of operators that can be
used to generate, transform, orchestrate Flux sequences.
It can emit 0 to n <T>
elements (onNext
event) then either completes or errors
(onComplete
and onError
terminal events). If no terminal event is triggered, the
Flux
is infinite.
- Static factories on Flux allow to create sources, or generate them from several callbacks types.
- Instance methods, the operators, let you build an asynchronous processing pipeline that will produce an asynchronous sequence.
- Each
Flux#subscribe()
or multicasting operation such asFlux#publish
andFlux#publishNext
will materialize a dedicated instance of the pipeline and trigger the data flow inside it.
See the javadoc here.
Flux
in action :
Flux.fromIterable(getSomeLongList())
.delayElements(Duration.ofMillis(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResumeWith(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
Practice
In this lesson we'll see different factory methods to create a Flux
.
TIP: If you want some insight as to what is going on inside a
Flux
orMono
you are about to return during one of these exercises, you can always append.log()
to the flux just before returning it. Part 6 makes use of that.
Let's try a very simple example: just return an empty flux.
static <T> Flux<T> empty()
// Create a Flux that completes without emitting any item.
One can also create a Flux
out of readily available data:
static <T> Flux<T> just(T... data)
// Create a new Flux that emits the specified item(s) and then complete.
This time we will use items of a list to publish on the flux with fromIterable
:
static <T> Flux<T> fromIterable(Iterable<? extends T> it)
// Create a Flux that emits the items contained in the provided Iterable.
In imperative synchronous code, it's easy to manage exceptions with familiar try
-catch
blocks, throw
instructions...
But in an asynchronous context, we have to do things a bit differently. Reactive Streams
defines the onError
signal to deal with exceptions. Note that such an event is terminal:
this is the last event the Flux
will produce.
Flux#error
produces a Flux
that simply emits this signal, terminating immediately:
static <T> Flux<T> error(Throwable error)
// Create a Flux that completes with the specified error.
To finish with Flux
, let's try to create a Flux that produces ten elements, at a regular pace.
In order to do that regular publishing, we can use interval
.
But it produces an infinite stream (like ticks of a clock), and we want to take
only
10 elements, so don't forget to precise it.
static Flux<Long> interval(Duration period)
// Create a new Flux that emits an ever incrementing long starting with 0 every period on the global timer.