Reactive Programming with Reactor 3

Reactor
6,916 views

Open Source Your Knowledge, Become a Contributor

Technology knowledge has to be shared and made accessible for free. Join the movement.

Create Content

Flux

Description

A Flux<T> is a Reactive Streams Publisher, augmented with a lot of operators that can be used to generate, transform, orchestrate Flux sequences.

It can emit 0 to n <T> elements (onNext event) then either completes or errors (onComplete and onError terminal events). If no terminal event is triggered, the Flux is infinite.

  • Static factories on Flux allow to create sources, or generate them from several callbacks types.
  • Instance methods, the operators, let you build an asynchronous processing pipeline that will produce an asynchronous sequence.
  • Each Flux#subscribe() or multicasting operation such as Flux#publish and Flux#publishNext will materialize a dedicated instance of the pipeline and trigger the data flow inside it.

See the javadoc here.

Marble diagram representation of a Flux

Flux in action :

Flux.fromIterable(getSomeLongList())
    .delayElements(Duration.ofMillis(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Practice

In this lesson we'll see different factory methods to create a Flux.

Tip TIP: If you want some insight as to what is going on inside a Flux or Mono you are about to return during one of these exercises, you can always append .log() to the flux just before returning it. Part 6 makes use of that.

Let's try a very simple example: just return an empty flux.

static <T> Flux<T> empty()
// Create a Flux that completes without emitting any item.
Empty flux

One can also create a Flux out of readily available data:

static <T> Flux<T> just(T... data)
// Create a new Flux that emits the specified item(s) and then complete.
Flux from values

This time we will use items of a list to publish on the flux with fromIterable:

static <T> Flux<T> fromIterable(Iterable<? extends T> it)
// Create a Flux that emits the items contained in the provided Iterable.
Create a Flux from a List

In imperative synchronous code, it's easy to manage exceptions with familiar try-catch blocks, throw instructions...

But in an asynchronous context, we have to do things a bit differently. Reactive Streams defines the onError signal to deal with exceptions. Note that such an event is terminal: this is the last event the Flux will produce.

Flux#error produces a Flux that simply emits this signal, terminating immediately:

static <T> Flux<T> error(Throwable error)
// Create a Flux that completes with the specified error.
Create a Flux that emits an IllegalStateException

To finish with Flux, let's try to create a Flux that produces ten elements, at a regular pace. In order to do that regular publishing, we can use interval. But it produces an infinite stream (like ticks of a clock), and we want to take only 10 elements, so don't forget to precise it.

static Flux<Long> interval(Duration period)
// Create a new Flux that emits an ever incrementing long starting with 0 every period on the global timer.
Create a Flux that emits 10 increasing values
Open Source Your Knowledge: become a Contributor and help others learn. Create New Content
1
2
package io.pivotal.literx;
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX