Spring Web-Flux/Project Reactor
Based on reactive programming paradigm

Introduction
Reactive programming represents a change in how we think about an application’s execution model. In a Reactive application, execution does not follow a linear model where one request is handled by one thread, instead, multiple requests are handled in an event-driven and non-blocking manner.
Reactive programming provides an event-based, asynchronous, and streaming programming model that can handle a large volume of concurrent requests coming from a single or multiple clients. Reactive applications usually require a small number of threads to scale vertically, rather than scaling horizontally. Designing and implementing applications using Reactive programming enables the application to maximize its use of the CPU, allowing the application to be more performant and efficient than traditional Java web applications.
Reactive programming promotes
1. Asynchronous
2. Non-blocking
3. Event-driven approach to data processing (event-driven architecture)
The term, “reactive,” refers to programming models that are built around reacting to change like network components reacting to I/O events, UI controllers reacting to mouse events, and others. In that sense, non-blocking is reactive, because, instead of being blocked, we are now in the mode of reacting to notifications as operations complete or data becomes available. (will see an example of how this happens)
Who is using?

Background
The origins of Reactive Programming can probably be traced to the 1970s or even earlier.
As a first step in the direction of reactive programming, Microsoft created the Reactive Extensions (Rx) library in the .NET ecosystem. Rx was ported to several languages and platforms, including JavaScript, Python, C++, Swift, and Java, of course, called RxJava
The fundamental idea of ReactiveX is that events are data and data are events.
In the JVM world, there are few implementations based on Reactive programming paradigm
1. Reactive Streams
2. RxJava 2.0 (Original developed by Netflix and later open-sourced)
3. Project Reactor (This is our topic)
4. Vert.x
Why Project Reactor?
Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM
Features :
1. Asynchronous and Non-Blocking for I/O operations
2. Backpressure saving subscribers from drowning (using reactor netty server)
3. Error handling channels
4. handle a large volume of concurrent requests coming from a single or multiple clients.
5. Function style programming
6. reactive streams are push-based which improves performance
7. Simple to use API’s
Mono
(for [0|1] elements) andFlux
(for [N] elements)Last Point should be the main reason to choose project reactor over other implementations
Let’s jump into more details on each point …
1. Non-Blocking for I/O calls
The key expected benefit of reactive and non-blocking is the ability to scale with a small, fixed number of threads and less memory.
Blocking Can Be Wasteful
There are, broadly, two ways one can improve a program’s performance:
parallelize to use more threads and more hardware resources.
seek more efficiency in how current resources are used.
Blocking wastes resources. If you look closely, as soon as a program involves some latency (notably I/O, such as a database request or a network call), resources are wasted because threads (possibly many threads) now sit idle, waiting for data.
Blocking request processing

One thread per connection
Non-blocking async request processing

Single Thread for multiple connections
Key points:
1. Context Switching using on publisher method on flux/mono
2. Event handler/ event loop
Event handling is achieved by using: Channel, Buffer and Selector.

Some commonly used Channels are listed below:
- FileChannel → read and write through file
- SocketChannel → read and write through TCP socket
- ServerSocketChannel →Listen to the TCP connection from clients, create new SocketChannel for each TCP connection
- DatagramChannel →read and write through UDP protocol
Buffer can be considered as a data container, which could be implemented by an array. Either reading from a channel or writing to a channel, data must be put into the buffer first.

Selector is the core class in Java NIO, it monitors and processes interested IO events that happened in multiple registered Channels. Through this mechanism, we could maintain multiple connections by just one thread. Only when the IO events actually happen among these connections, the IO process logic are truly invoked. There’s no need to start a new thread each time when a new connection comes in, therefore it would significantly reduce the system load.
In company with Selector, the SelectionKey is another important class, which represents an arrived event. These two classes constitute the key logic of the NIO server[10].
The figure above shows the monitoring of multiple channels (or connections) by a Selector running on a single thread.
Reactor Pattern
1. Reactor
A Reactor runs in a separate thread, and its job is to react to IO events by dispatching the work to the appropriate handler. It’s like a telephone operator in a company who answers calls from clients and transfers the line to the appropriate contact.
2. Handlers
A Handler performs the actual work to be done with an I/O event, similar to the actual officer in the company the client wants to speak to.
A reactor responds to I/O events by dispatching the appropriate handler. Handlers perform non-blocking actions.
In the reactor world, we achieve Asynchronous and Non-Blocking by

2. Backpressure saving subscribers from drowning
The concept of backpressure in Reactive Streams is as elegant as it is powerful. It will enable the use of slow Consumers within Reactive applications without them “choking up” on too much information. The unprocessed data can be buffered.

When a Consumer subscribes itself on a Producer, it will get a Subscription. This will enable a feedback mechanism from the Consumer of the datastream to its Producer. Through it, the Consumer can signal how many data events he’s able to handle.
When the Consumer signals it can handle 5 data events, the Producer will at a maximum call the Consumer 5 times with an onNext method. After consuming these 5 events, the Consumer will ask the Producer for extra events, until an onComplete or onError call occurs instead.
3. Error handling channels
In Reactive Streams, errors are terminal events. As soon as an error occurs, it stops the sequence and gets propagated down the chain of operators to the last step
Ex:
Flux.just(1, 2, 0)
.map(i -> “100 / “ + i + “ = “ + (100 / i)) //this triggers an error with 0
.onErrorReturn(“Divided by zero :(“); // error handling example
Error Handling Operators



To Know more point 4, 5 and 6, Please visit project-reactor,
I will jump directly to Point 7, which is the interesting part of this blog
4. handle a large volume of concurrent requests coming from a single or multiple clients.
5. Function style programming
6. reactive streams are push-based which improves performance
7. Simple to use API’s Mono
(for [0|1] elements) and Flux
(for [N] elements)
Spring Framework:
Spring Boot comparison of reactive (Spring Webflux using project reactor) vs non-reactive stack

Reactor Netty Server
Reactor Netty is an asynchronous event-driven network application framework. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. As the name implies, it’s based on the Netty framework.
Spring WebFlux revolves around 2 API’s Flux and Mono that’s it…
Flux, an Asynchronous Sequence of 0-N Items
The following image shows how a Flux
transforms items:

Mono, an Asynchronous 0–1 Result
The following image shows how a Mono
transforms an item:

Code Examples
Spring controller
Non-Reactive Code
@GetMapping(“/tweets-blocking”)public List<Tweet> getTweetsBlocking() {log.info(“Starting BLOCKING Controller!”);final String uri = getSlowServiceUri();ResponseEntity<List<Tweet>> response = new RestTemplate().exchange(uri, HttpMethod.GET, null, new ParameterizedTypeReference<List<Tweet>>(){});List<Tweet> result = response.getBody();result.forEach(tweet -> log.info(tweet.toString()));log.info(“Exiting BLOCKING Controller!”);return result; }
Let’s observe the log:
Starting BLOCKING Controller!
Tweet(text=RestTemplate rules, username=@user1)
Tweet(text=WebClient is better, username=@user2)
Tweet(text=OK, both are useful, username=@user1)
Exiting BLOCKING Controller!
Reactive Code
@GetMapping(value = “/tweets-non-blocking”)public Flux<Tweet> getTweetsNonBlocking() {log.info(“Starting NON-BLOCKING Controller!”);WebClient.create().get().uri(getSlowServiceUri()).retrieve().bodyToFlux(Tweet.class).subscribe(tweet -> log.info(tweet.toString()));log.info(“Exiting NON-BLOCKING Controller!”);return tweetFlux;}
Let’s observe the log this time:
Starting NON-BLOCKING Controller!
Exiting NON-BLOCKING Controller!
Tweet(text=RestTemplate rules, username=@user1)
Tweet(text=WebClient is better, username=@user2)
Tweet(text=OK, both are useful, username=@user1)
The difference is log order: nothing happens until subscribes (lazy execution model)
Other example of reactive code
return nomineesRepository
.findByYearGreaterThanEqual(year)
.switchIfEmpty(Flux.error(new NomineesNotFoundException()))
.filter(NomineesResponse::isWinner)
.delayElements(Duration.ofMillis(1L))
.onBackpressureBuffer(1)
.map(nomineesResponse ->{nomineesResponse.setEntity("changed the entity");
return nomineesResponse;
});
Reactive Testing Support
Whether you have written a simple chain of Reactor operators or your own operator, automated testing is always a good idea.
You can ask and answer questions such as the following:
1. What is the next expected event?
2. Do you expect the
Flux
to emit a particular value?3. Or maybe to do nothing for the next 300ms?

In order to test it, you want to verify the following scenario:

Example code :
Testing Reactive Method Security
We can test our example
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = HelloWebfluxMethodApplication.class)
public class HelloWorldMessageServiceTests {
@Autowired
HelloWorldMessageService messages;@Test
public void messagesWhenNotAuthenticatedThenDenied() {
StepVerifier.create(this.messages.findMessage())
.expectError(AccessDeniedException.class)
.verify();}@Test
@WithMockUser
public void messagesWhenUserThenDenied() {
StepVerifier.create(this.messages.findMessage())
.expectError(AccessDeniedException.class)
.verify();
}@Test
@WithMockUser(roles = "ADMIN")
public void messagesWhenAdminThenOk() {
StepVerifier.create(this.messages.findMessage())
.expectNext("Hello World!")
.verifyComplete();
}
}

What are the advantages and disadvantages?
Advantages
- Cleaner code, more concise
- Easier to read (once you get the hang of it)
- Easier to scale (pipe any operation)
- Better error handling
- Event-driven inspired -> plays well with streams (Kafka, RabbitMQ,etc)
- Backpressure (client can control flow)
- Non-blocking I/O operations (which means better response times)
Disadvantages
· More memory intensive to store streams of data most of the time (since it is based on streams over time).
· Might feel unconventional to learn at the start(needs everything to be a stream).
· Different programming style compared to conventional java style
· Everything has to be wrapped around Mono/Flux
When to use?
To achieve high throughput
Heavy data flow in the system
When not to use
When there is no much data to process within the system
What are Reactive Systems?
Reactive systems are not the same thing as reactive programming. Reactive programming is used at the code level, while reactive systems deal with architecture.
· Responsiveness: Be available for users and, whatever happens (overload, failure, etc.), be ready to respond to them.
· Resilience: Stay immune to faults, disruptions, and extremely high loads.
· Elasticity: Use of resources efficiently and balance the machine performance — vertical scaling up or down — or easily regulate the number of machines involved — horizontal scaling up or down — depending on the load.
· Message-driven character: Embrace a completely non-blocking communication via sending immutable messages to addressable recipients.
Sample Github repo: https://github.com/mNasiruddin/project-reactor
References
https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html
https://projectreactor.io/docs/core/release/reference/index.html
https://docs.spring.io/spring-security/site/docs/current/reference/html/test-webflux.html