Reactive Programming with Reactor 3
Open Source Your Knowledge, Become a Contributor
Technology knowledge has to be shared and made accessible for free. Join the movement.
Blocking to Reactive
Description
The big question is "How to deal with legacy, non reactive code?".
Say you have blocking code (eg. a JDBC connection to a database), and you want to integrate that into your reactive pipelines while avoiding too much of an impact on performance.
The best course of action is to isolate such intrinsically blocking parts of your code into
their own execution context via a Scheduler
, keeping the efficiency of the rest of the
pipeline high and only creating extra threads when absolutely needed.
In the JDBC example you could use the fromIterable
factory method. But how do you prevent
the call to block the rest of the pipeline?
Practice
The subscribeOn
method allow to isolate a sequence from the start on a provided Scheduler
.
For example, the Schedulers.boundedElastic()
will create a pool of threads that grows on demand,
releasing threads that haven't been used in a while automatically. In order to avoid too many
threads due to abusing of this easy option, the boundedElastic
Scheduler places an upper limit
to the number of threads it can create and reuse (unlike the now deprecated elastic()
one).
Use that trick to slowly read all users from the blocking repository
in the first exercise.
Note that you will need to wrap the call to the repository inside a Flux.defer
lambda.
For slow subscribers (eg. saving to a database), you can isolate a smaller section of the
sequence with the publishOn
operator. Unlike subscribeOn
, it only affects the part of
the chain below it, switching it to a new Scheduler
.
As an example, you can use doOnNext
to perform a save
on the repository
, but first
use the trick above to isolate that save into its own execution context. You can make it
more explicit that you're only interested in knowing if the save succeeded or failed by
chaining the then()
operator at the end, which returns a Mono<Void>
.