HMH Engineering

HMH Engineering builds fantastic software to meet the challenges facing teachers and learners. We enable and support a wide range of next-generation learning experiences, designing and building apps and services used daily by millions of students and educators across the USA.

Follow publication

Spring Web-Flux/Project Reactor

--

Based on reactive programming paradigm

Reactive Framework behind Spring WebFlux
Event Loop Model

Introduction

Reactive programming represents a change in how we think about an application’s execution model. In a Reactive application, execution does not follow a linear model where one request is handled by one thread, instead, multiple requests are handled in an event-driven and non-blocking manner.

Reactive programming provides an event-based, asynchronous, and streaming programming model that can handle a large volume of concurrent requests coming from a single or multiple clients. Reactive applications usually require a small number of threads to scale vertically, rather than scaling horizontally. Designing and implementing applications using Reactive programming enables the application to maximize its use of the CPU, allowing the application to be more performant and efficient than traditional Java web applications.

Reactive programming promotes

1. Asynchronous

2. Non-blocking

3. Event-driven approach to data processing (event-driven architecture)

The term, “reactive,” refers to programming models that are built around reacting to change like network components reacting to I/O events, UI controllers reacting to mouse events, and others. In that sense, non-blocking is reactive, because, instead of being blocked, we are now in the mode of reacting to notifications as operations complete or data becomes available. (will see an example of how this happens)

Who is using?

Background

The origins of Reactive Programming can probably be traced to the 1970s or even earlier.

As a first step in the direction of reactive programming, Microsoft created the Reactive Extensions (Rx) library in the .NET ecosystem. Rx was ported to several languages and platforms, including JavaScript, Python, C++, Swift, and Java, of course, called RxJava

The fundamental idea of ReactiveX is that events are data and data are events.

In the JVM world, there are few implementations based on Reactive programming paradigm

1. Reactive Streams

2. RxJava 2.0 (Original developed by Netflix and later open-sourced)

3. Project Reactor (This is our topic)

4. Vert.x

Why Project Reactor?

Reactor is a fourth-generation reactive library, based on the Reactive Streams

specification, for building non-blocking applications on the JVM

Features :

1. Asynchronous and Non-Blocking for I/O operations

2. Backpressure saving subscribers from drowning (using reactor netty server)

3. Error handling channels

4. handle a large volume of concurrent requests coming from a single or multiple clients.

5. Function style programming

6. reactive streams are push-based which improves performance

7. Simple to use API’s Mono (for [0|1] elements) and Flux (for [N] elements)

Last Point should be the main reason to choose project reactor over other implementations

Let’s jump into more details on each point …

1. Non-Blocking for I/O calls

The key expected benefit of reactive and non-blocking is the ability to scale with a small, fixed number of threads and less memory.

Blocking Can Be Wasteful

There are, broadly, two ways one can improve a program’s performance:

parallelize to use more threads and more hardware resources.

seek more efficiency in how current resources are used.

Blocking wastes resources. If you look closely, as soon as a program involves some latency (notably I/O, such as a database request or a network call), resources are wasted because threads (possibly many threads) now sit idle, waiting for data.

Blocking request processing

One thread per connection

Non-blocking async request processing

Single Thread for multiple connections

Key points:

1. Context Switching using on publisher method on flux/mono

2. Event handler/ event loop

Event handling is achieved by using: Channel, Buffer and Selector.

Some commonly used Channels are listed below:

  • FileChannel → read and write through file
  • SocketChannel → read and write through TCP socket
  • ServerSocketChannel →Listen to the TCP connection from clients, create new SocketChannel for each TCP connection
  • DatagramChannel →read and write through UDP protocol

Buffer can be considered as a data container, which could be implemented by an array. Either reading from a channel or writing to a channel, data must be put into the buffer first.

Thread-Selector-Channels

Selector is the core class in Java NIO, it monitors and processes interested IO events that happened in multiple registered Channels. Through this mechanism, we could maintain multiple connections by just one thread. Only when the IO events actually happen among these connections, the IO process logic are truly invoked. There’s no need to start a new thread each time when a new connection comes in, therefore it would significantly reduce the system load.

In company with Selector, the SelectionKey is another important class, which represents an arrived event. These two classes constitute the key logic of the NIO server[10].

The figure above shows the monitoring of multiple channels (or connections) by a Selector running on a single thread.

Reactor Pattern

1. Reactor

A Reactor runs in a separate thread, and its job is to react to IO events by dispatching the work to the appropriate handler. It’s like a telephone operator in a company who answers calls from clients and transfers the line to the appropriate contact.

2. Handlers

A Handler performs the actual work to be done with an I/O event, similar to the actual officer in the company the client wants to speak to.

A reactor responds to I/O events by dispatching the appropriate handler. Handlers perform non-blocking actions.

In the reactor world, we achieve Asynchronous and Non-Blocking by

Asynchronous composing API’s: Flux and Mono ( will see examples)

2. Backpressure saving subscribers from drowning

The concept of backpressure in Reactive Streams is as elegant as it is powerful. It will enable the use of slow Consumers within Reactive applications without them “choking up” on too much information. The unprocessed data can be buffered.

Producer-Consumer model

When a Consumer subscribes itself on a Producer, it will get a Subscription. This will enable a feedback mechanism from the Consumer of the datastream to its Producer. Through it, the Consumer can signal how many data events he’s able to handle.

When the Consumer signals it can handle 5 data events, the Producer will at a maximum call the Consumer 5 times with an onNext method. After consuming these 5 events, the Consumer will ask the Producer for extra events, until an onComplete or onError call occurs instead.

3. Error handling channels

In Reactive Streams, errors are terminal events. As soon as an error occurs, it stops the sequence and gets propagated down the chain of operators to the last step

Ex:

Flux.just(1, 2, 0)

.map(i -> “100 / “ + i + “ = “ + (100 / i)) //this triggers an error with 0

.onErrorReturn(“Divided by zero :(“); // error handling example

Error Handling Operators

Error Handling

To Know more point 4, 5 and 6, Please visit project-reactor,

I will jump directly to Point 7, which is the interesting part of this blog

4. handle a large volume of concurrent requests coming from a single or multiple clients.

5. Function style programming

6. reactive streams are push-based which improves performance

7. Simple to use API’s Mono (for [0|1] elements) and Flux (for [N] elements)

Spring Framework:

Spring Boot comparison of reactive (Spring Webflux using project reactor) vs non-reactive stack

Reactor Netty Server

Reactor Netty is an asynchronous event-driven network application framework. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. As the name implies, it’s based on the Netty framework.

Spring WebFlux revolves around 2 API’s Flux and Mono that’s it…

Flux, an Asynchronous Sequence of 0-N Items

The following image shows how a Flux transforms items:

Mono, an Asynchronous 0–1 Result

The following image shows how a Mono transforms an item:

Code Examples

Spring controller

Non-Reactive Code

@GetMapping(“/tweets-blocking”)public List<Tweet> getTweetsBlocking() {log.info(“Starting BLOCKING Controller!”);final String uri = getSlowServiceUri();ResponseEntity<List<Tweet>> response = new RestTemplate().exchange(uri, HttpMethod.GET, null, new ParameterizedTypeReference<List<Tweet>>(){});List<Tweet> result = response.getBody();result.forEach(tweet -> log.info(tweet.toString()));log.info(“Exiting BLOCKING Controller!”);return result; }

Let’s observe the log:

Starting BLOCKING Controller!

Tweet(text=RestTemplate rules, username=@user1)

Tweet(text=WebClient is better, username=@user2)

Tweet(text=OK, both are useful, username=@user1)

Exiting BLOCKING Controller!

Reactive Code

@GetMapping(value = “/tweets-non-blocking”)public Flux<Tweet> getTweetsNonBlocking() {log.info(“Starting NON-BLOCKING Controller!”);WebClient.create().get().uri(getSlowServiceUri()).retrieve().bodyToFlux(Tweet.class).subscribe(tweet -> log.info(tweet.toString()));log.info(“Exiting NON-BLOCKING Controller!”);return tweetFlux;}

Let’s observe the log this time:

Starting NON-BLOCKING Controller!

Exiting NON-BLOCKING Controller!

Tweet(text=RestTemplate rules, username=@user1)

Tweet(text=WebClient is better, username=@user2)

Tweet(text=OK, both are useful, username=@user1)

The difference is log order: nothing happens until subscribes (lazy execution model)

Other example of reactive code

return nomineesRepository
.findByYearGreaterThanEqual(year)
.switchIfEmpty(Flux.error(new NomineesNotFoundException()))
.filter(NomineesResponse::isWinner)
.delayElements(Duration.ofMillis(1L))
.onBackpressureBuffer(1)
.map(nomineesResponse ->{nomineesResponse.setEntity("changed the entity");
return nomineesResponse;
});

Reactive Testing Support

Whether you have written a simple chain of Reactor operators or your own operator, automated testing is always a good idea.

You can ask and answer questions such as the following:

1. What is the next expected event?

2. Do you expect the Flux to emit a particular value?

3. Or maybe to do nothing for the next 300ms?

In order to test it, you want to verify the following scenario:

Example code :

Testing Reactive Method Security

We can test our example

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = HelloWebfluxMethodApplication.class)
public class HelloWorldMessageServiceTests {
@Autowired
HelloWorldMessageService messages;
@Test
public void messagesWhenNotAuthenticatedThenDenied() {
StepVerifier.create(this.messages.findMessage())
.expectError(AccessDeniedException.class)
.verify();}
@Test
@WithMockUser
public void messagesWhenUserThenDenied() {
StepVerifier.create(this.messages.findMessage())
.expectError(AccessDeniedException.class)
.verify();
}
@Test
@WithMockUser(roles = "ADMIN")
public void messagesWhenAdminThenOk() {
StepVerifier.create(this.messages.findMessage())
.expectNext("Hello World!")
.verifyComplete();
}
}

What are the advantages and disadvantages?

Advantages

  • Cleaner code, more concise
  • Easier to read (once you get the hang of it)
  • Easier to scale (pipe any operation)
  • Better error handling
  • Event-driven inspired -> plays well with streams (Kafka, RabbitMQ,etc)
  • Backpressure (client can control flow)
  • Non-blocking I/O operations (which means better response times)

Disadvantages

· More memory intensive to store streams of data most of the time (since it is based on streams over time).

· Might feel unconventional to learn at the start(needs everything to be a stream).

· Different programming style compared to conventional java style

· Everything has to be wrapped around Mono/Flux

When to use?

To achieve high throughput

Heavy data flow in the system

When not to use

When there is no much data to process within the system

What are Reactive Systems?

Reactive systems are not the same thing as reactive programming. Reactive programming is used at the code level, while reactive systems deal with architecture.

· Responsiveness: Be available for users and, whatever happens (overload, failure, etc.), be ready to respond to them.

· Resilience: Stay immune to faults, disruptions, and extremely high loads.

· Elasticity: Use of resources efficiently and balance the machine performance — vertical scaling up or down — or easily regulate the number of machines involved — horizontal scaling up or down — depending on the load.

· Message-driven character: Embrace a completely non-blocking communication via sending immutable messages to addressable recipients.

Sample Github repo: https://github.com/mNasiruddin/project-reactor

References

https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html

https://projectreactor.io/docs/core/release/reference/index.html

https://docs.spring.io/spring-security/site/docs/current/reference/html/test-webflux.html

--

--

Published in HMH Engineering

HMH Engineering builds fantastic software to meet the challenges facing teachers and learners. We enable and support a wide range of next-generation learning experiences, designing and building apps and services used daily by millions of students and educators across the USA.

Written by Mohammed Nasiruddin

Passionate Software Engineer, Interested in Java, Microservices, Event-Driven Pattern, RxJava, Docker … to quantum computing

No responses yet

Write a response