๐น 1️⃣ subscribeOn
– "Decides WHERE the Pipeline Starts"
๐ Definition:
-
subscribeOn
influences the thread where the data source (upstream) (e.g., data generation, API calls) runs. -
It affects the source and everything downstream (until a
publishOn
switches it).
๐ข Key Insight:
-
subscribeOn
affects the beginning (source) and flows down. -
subscribeOn
doesn’t just switch one stage; it affects everything downstream unless overridden bypublishOn
.
2️⃣ publishOn
– "Switches the Thread at a Certain Point"
๐ Definition:
-
publishOn
switches the thread context at the point where it appears. -
It affects only downstream operators.
Key Insight:
-
publishOn
switches thread only from where it’s placed. -
Upstream (before
publishOn
) stays on original thread (e.g.,main
).
Feature | subscribeOn |
publishOn |
---|---|---|
Thread Affected | Affects source and downstream | Affects only downstream from where used |
Usage | To control source/initial thread | To switch context mid-pipeline |
Common Use Case | I/O sources (DB, network calls) | Switching from CPU-bound to I/O or vice versa |
Placement Impact | Works wherever it is, but affects source | Works where it is, affects downstream |
๐ 1️⃣ What is defer
in Reactor?
๐ Definition:
-
Mono.defer
orFlux.defer
delays the creation of the actual Publisher (Mono/Flux) until a subscriber subscribes. -
Every subscriber gets a new instance of the Publisher, which is created lazily.
๐ This is especially useful when you:
-
Want to wrap a method call (like DB access, event publishing, etc.).
-
Ensure the call happens only when subscribed (and not during definition of the pipeline).
-
Ensure threading behavior (as
subscribeOn
affects from the point of subscription).
2️⃣ Example: Without defer
Let’s say we call a DB method that returns a Mono.
public Mono<String> getDataFromDb() {
System.out.println("Calling DB on thread: " + Thread.currentThread().getName());
return Mono.just("Data");
}
Mono<String> result = getDataFromDb()
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName()));
result.subscribe();
✅ Observation:
-
getDataFromDb()
is called immediately, when definingresult
. -
Even though
subscribeOn
is used, the method is called onmain
thread.
defer
Mono<String> result = Mono.defer(() -> { System.out.println("Calling DB on thread: " + Thread.currentThread().getName()); return Mono.just("Data");}).subscribeOn(Schedulers.boundedElastic()).doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName()));
result.subscribe();
Calling DB on thread: boundedElastic-1Got: Data on boundedElastic-1
Observation:
-
Now, defer
delays the DB call until subscription.
-
Because subscribeOn
controls the thread where subscription happens, the DB call is made on the boundedElastic
thread.
4️⃣ defer
+ publishOn
– What Happens?
Mono<String> result = Mono.defer(() -> { System.out.println("Calling DB on thread: " + Thread.currentThread().getName()); return Mono.just("Data");}).subscribeOn(Schedulers.boundedElastic()) // Controls where subscription & upstream happen.publishOn(Schedulers.parallel()) // Switch thread for downstream.map(data -> { System.out.println("Processing on thread: " + Thread.currentThread().getName()); return data.toUpperCase();}).doOnNext(data -> System.out.println("Final: " + data + " on " + Thread.currentThread().getName()));
result.subscribe();
Calling DB on thread: boundedElastic-1Processing on thread: parallel-1Final: DATA on parallel-1
✅ Observation:
-
defer
ensures DB call is on boundedElastic (because of subscribeOn
).
-
publishOn
switches thread after DB call (for downstream processing)
Concept
Meaning
defer
Wraps a supplier so it’s called lazily at subscription time, not pipeline definition.
subscribeOn
Controls thread of subscription & upstream (including defer calls).
publishOn
Switches thread from that point downstream.
---------------------------------End--------------------------------
๐ Schedulers in Reactor: Deep Dive
Schedulers are the backbone of threading in Reactor. They control where and how your reactive pipeline’s tasks run.
Let’s start by listing common Reactor Schedulers:
-
Schedulers.immediate()
-
Schedulers.single()
-
Schedulers.parallel()
-
Schedulers.boundedElastic()
-
Schedulers.newSingle(name)
-
Schedulers.fromExecutorService(executor)
๐ฏ 1️⃣ Schedulers.immediate()
๐ Definition:
Runs tasks on the current thread (no thread switch).
๐ Use Case:
-
When you don’t want threading overhead (for example, when already on a background thread).
-
For debugging or very lightweight tasks.
Mono.just("data") .subscribeOn(Schedulers.immediate()) .doOnNext(d -> System.out.println("Immediate: " + Thread.currentThread().getName())) .subscribe();
Output
Immediate: main
๐ฏ 2️⃣ Schedulers.single()
๐ Definition:
A single-threaded scheduler backed by one thread (shared across all usages).
๐ Use Case:
-
Sequential task processing (e.g., writing logs, file writes, or certain transactional operations).
-
Avoids concurrency issues.
Example
Mono.just("single-threaded") .subscribeOn(Schedulers.single()) .doOnNext(d -> System.out.println("Single: " + Thread.currentThread().getName())) .subscribe();
Single: reactor-single-1
๐ฏ 3️⃣ Schedulers.parallel()
๐ Definition:
A fixed-size thread pool, typically based on the number of CPU cores (Runtime.getRuntime().availableProcessors()
).
๐ Use Case:
-
CPU-bound tasks (e.g., intensive calculations, transformation logic).
-
Leverage multiple cores for parallelism.
Flux.range(1, 5) .parallel() // Makes parallel flux .runOn(Schedulers.parallel()) .doOnNext(i -> System.out.println("Parallel: " + i + " on " + Thread.currentThread().getName())) .sequential() .subscribe();
OutputParallel: 1 on parallel-1Parallel: 2 on parallel-2Parallel: 3 on parallel-1Parallel: 4 on parallel-2Parallel: 5 on parallel-1
๐ฏ 4️⃣ Schedulers.boundedElastic()
๐ Definition:
An elastic thread pool with limits. Creates threads as needed (like newCachedThreadPool
) but bounded to avoid resource exhaustion.
๐ Use Case:
-
I/O-bound operations:
-
Blocking DB calls (JDBC)
-
File reads/writes
-
Calling external APIs
-
When the number of concurrent threads could be large but controlled.
Mono.fromCallable(() -> { Thread.sleep(1000); // Simulate blocking call return "Elastic data";}).subscribeOn(Schedulers.boundedElastic()).doOnNext(d -> System.out.println("Elastic: " + d + " on " + Thread.currentThread().getName())).subscribe();
Elastic: Elastic data on boundedElastic-1
๐ฏ 5️⃣ Schedulers.newSingle("custom")
๐ Definition:
Creates a new single-threaded Scheduler with a custom thread name.
๐ Use Case:
-
When you need an isolated thread for a specific purpose (e.g., a dedicated logger or a sensitive task).
Scheduler myScheduler = Schedulers.newSingle("my-thread");
Mono.just("custom") .subscribeOn(myScheduler) .doOnNext(d -> System.out.println("Custom: " + d + " on " + Thread.currentThread().getName())) .subscribe();
Custom: custom on my-thread-1
๐ฏ 6️⃣ Schedulers.fromExecutorService()
๐ Definition:
Wraps a custom ExecutorService (e.g., a Java thread pool) into a Scheduler.
๐ Use Case:
-
When integrating with an existing thread pool (from legacy systems or custom configurations).
๐ป Example:
ExecutorService executor = Executors.newFixedThreadPool(4);
Scheduler customScheduler = Schedulers.fromExecutorService(executor);
Mono.just("custom-executor")
.subscribeOn(customScheduler)
.doOnNext(d -> System.out.println("Executor: " + d + " on " + Thread.currentThread().getName()))
.subscribe();
Scheduler
Threads
Use Case
immediate
Current thread
No threading
single
1 shared thread
Sequential tasks
parallel
CPU cores
CPU-bound parallel tasks
boundedElastic
Dynamic, bounded pool
I/O-bound, blocking tasks
newSingle("name")
1 new thread
Isolated, named thread
fromExecutorService
Custom pool
Integrate legacy thread pools
------------------------End----------------------------
๐ฅ map vs flatMap in Reactor: Detailed Breakdown
1️⃣ map()
๐ What is it?
-
Synchronous transformation of data.
-
It maps (transforms) one value to another in a one-to-one fashion.
-
Input type → Output type transformation.
๐ฅ Key Points:
-
Takes a Function<T, R>: input type T
to output type R
.
-
Runs in the same thread where it’s called (no async switch).
-
Great for simple data transformations (e.g., uppercasing a string, adding a number).
Mono<String> mono = Mono.just("hello");
mono.map(s -> s.toUpperCase()) // Transform "hello" → "HELLO" .subscribe(result -> System.out.println("Result: " + result));
Output Result: HELLO
๐ฏ 2️⃣ flatMap()
๐ What is it?
-
Asynchronous or nested transformation.
-
It maps a value to a Publisher (Mono or Flux), and then flattens the result.
-
Useful for chaining async calls (like DB access, web calls).
๐ฅ Key Points:
-
Takes a Function<T, Publisher<R>>.
-
It flattens nested streams into a single stream.
-
Often used for asynchronous workflows where each step depends on the result of the previous one.
๐ป Example:
Mono<String> mono = Mono.just("user123");
// Simulate async DB callmono.flatMap(id -> Mono.just("User Name for " + id)) .subscribe(result -> System.out.println("Result: " + result));
Output
Result: User Name for user123
Flattening Example (No Async, No Threading)
Here’s a basic flattening example:
Flux<String> source = Flux.just("A", "B", "C");
// flatMap each letter to a Flux of two letters (like "A" → ["A1", "A2"])
Flux<String> flattened = source.flatMap(letter -> Flux.just(letter + "1", letter + "2"));
flattened.subscribe(System.out::println);
What does "flattening" mean in flatMap?When you call flatMap, it maps each item to a new Publisher (Mono/Flux), and then flattens those inner Publishers into the main stream.
Imagine:
You start with Mono<T> or Flux<T>.
Each T maps to a Publisher<R> (Mono or Flux).
Instead of ending up with nested Publishers (Mono<Mono<R>> or Flux<Flux<R>>), Reactor "flattens" them into Mono<R> or Flux<R>.
Flux<T> --flatMap--> Flux<Publisher<R>> ---flatten---> Flux<R>Mono<T> --flatMap--> Mono<Publisher<R>> ---flatten---> Mono<R>
Async Example Without Explicit subscribeOn
Now, what if the inner Publisher itself is asynchronous (e.g., network call, DB call, timer)? Even if you don’t specify subscribeOn, the inner Publisher’s behavior can introduce asynchrony.
Flux<String> source = Flux.just("A", "B", "C");
// Simulate async operation using delayElements
Flux<String> flattened = source.flatMap(letter ->
Flux.just(letter + "1", letter + "2").delayElements(Duration.ofMillis(100))
);
flattened.subscribe(result ->
System.out.println("Received: " + result + " on " + Thread.currentThread().getName())
);
What’s Happening?
-
The outer Flux is processed synchronously.
-
The inner Flux has delayElements
, which schedules its elements on the default timer scheduler in Reactor (async).
-
This introduces asynchrony, even though we didn’t call subscribeOn
.
Received: A1 on reactor-timer_1Received: A2 on reactor-timer_1Received: B1 on reactor-timer_1Received: B2 on reactor-timer_1Received: C1 on reactor-timer_1Received: C2 on reactor-timer_1
Why Async Without Explicit Threading?
✅ Reactor supports asynchronous operators (like delayElements
, fromFuture
, etc.) that internally use Schedulers.
✅ Even if you don’t specify subscribeOn
, these operators introduce scheduling (e.g., Reactor’s timer
scheduler for delayElements
).
✅ So, flatMap’s inner Publishers can be async if the operation inside is async.
Feature
map
flatMap
Sync/Async
Synchronous
Asynchronous or nested
Transformation
One-to-one
One-to-many (flattens inner Publisher)
Threading
Same thread
Depends on inner publisher's thread
Use case
Simple transformation
Chaining async/complex calls
----------------------------_----End---------------------------
1️⃣ What is timeout
in Reactor?
timeout
is an operator used in Project Reactor (Mono/Flux) to specify the maximum time you’re willing to wait for an item from the upstream Publisher.
-
If the upstream doesn’t emit within the specified duration, it terminates the sequence with a TimeoutException (or can optionally switch to a fallback sequence).
๐ Analogy:
Imagine you’re waiting for a pizza delivery. You say:
“I’ll wait 30 minutes. If it’s not here by then, I’ll order from a different place.”
2️⃣ Basic Usage of timeout
Example 1: Simple Timeout
Flux<String> slowFlux = Flux.just("A", "B", "C") .delayElements(Duration.ofSeconds(2));
slowFlux .timeout(Duration.ofSeconds(1)) // If any element takes >1s, error .subscribe( item -> System.out.println("Received: " + item), err -> System.err.println("Error: " + err) );
Explanation:
-
delayElements(2s)
delays each item by 2 seconds.
-
timeout(1s)
expects each element within 1 second.
-
If any item is delayed beyond 1s, it triggers TimeoutException.
3️⃣ Timeout With Fallback
Instead of terminating with an error, we can switch to a fallback:
slowFlux
.timeout(Duration.ofSeconds(1), Flux.just("Fallback", "Default"))
.subscribe(
item -> System.out.println("Received: " + item),
err -> System.err.println("Error: " + err)
);
๐ Now, if a timeout occurs:
-
Instead of error, the stream switches to Flux.just("Fallback", "Default")
.
How Does It Work Under the Hood?
-
timeout
internally sets up a timer (Reactor uses a Scheduler
, usually parallel
or timer
).
-
If an item does not arrive in time, it:
-
Either emits TimeoutException
(default behavior).
-
Or cancels the upstream and switches to a fallback Publisher (if provided).
๐ Threading: The timeout timer runs on a separate scheduler (usually timer or parallel), not blocking the main thread.
Operator
Description
timeout(Duration)
Waits max Duration
for next item, errors on timeout.
timeout(Duration, Publisher fallback)
On timeout, switch to fallback
Publisher instead of error.
timeout(Publisher firstTimeout)
Timeout is determined by emissions from firstTimeout
.
timeout(Publisher first, Function)
Dynamically control timeouts per item.
Example: Dynamic Timeout Per Item
Flux<String> items = Flux.just("A", "B", "C");
// Each item has a custom timeout: "A" gets 1s, "B" gets 2s, etc.
items
.timeout(
item -> {
if (item.equals("A")) return Mono.delay(Duration.ofSeconds(1));
else return Mono.delay(Duration.ofSeconds(2));
}
)
.subscribe(
System.out::println,
err -> System.err.println("Error: " + err)
);
⚠️ 6️⃣ Common Pitfalls
1️⃣ Timeout Is Per Item:
Timeout applies to each item individually. So even if the stream emits items regularly, any item that takes too long triggers timeout.
2️⃣ Scheduler Overhead:
Each timeout
sets up a timer. For very high-frequency streams, too many timers can cause overhead.
3️⃣ Fallback Must Be Reactive:
If you use fallback, it must be a Publisher, not a static value.
Concept
Key Points
timeout
Sets max wait time for each item. Emits error or switches fallback.
Fallback
Optional Publisher to continue on timeout instead of error.
Dynamic timeout
Custom timeout per item using Function
.
Threading
Uses internal Scheduler (e.g., timer) for managing timeout logic.
Use Cases
DB queries, WebClient calls, external systems with variable response times.
๐ฅ Key onError Methods:
Method
Purpose
Example
onErrorReturn(T fallback)
Return a fallback value when an error occurs
.onErrorReturn("Fallback value")
onErrorResume(Function)
Switch to a fallback Publisher (Mono/Flux) when an error occurs
.onErrorResume(e -> Flux.just("Fallback"))
onErrorMap(Function)
Transform the error into another error (e.g., wrapping)
.onErrorMap(e -> new CustomException(e))
onErrorContinue(BiConsumer)
Log or skip the error and continue with the next item
.onErrorContinue((e, item) -> log.warn(...))
doOnError(Consumer)
Side-effect: perform an action on error but don’t recover
.doOnError(e -> log.error("Error: " + e))
⚠️ onErrorReturn
vs onErrorResume
-
onErrorReturn
: emits one fallback value and terminates.
-
onErrorResume
: switches to a new sequence (Flux/Mono) and continues.
-------------------------------------End---------------------
2️⃣ subscribe()
Overloads in Reactor
๐ subscribe()
in Project Reactor
In Reactor, nothing happens until you call subscribe()
. It is a terminal operation that starts the data flow in the reactive pipeline (aka subscription to the Publisher).
subscribe();
subscribe(Consumer<? super T> consumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
✅ 1. subscribe()
-
Use Case: Fire-and-forget (e.g., logging, debugging, testing).
-
No way to observe emitted values or errors.
Flux.just("A", "B", "C") .map(String::toLowerCase) .subscribe();
✅ 2. subscribe(Consumer<T> onNext)
-
Use Case: You care about the data but not errors or completion.
-
Best for simple flows like logging results.
Mono.just("Hello") .subscribe(data -> System.out.println("Received: " + data));
✅ 3. subscribe(onNext, onError)
-
Use Case: Want to handle data and errors.
-
Recommended for robust code.
Mono.just("A") .map(s -> { throw new RuntimeException("Boom!"); }) .subscribe( data -> System.out.println("Received: " + data), error -> System.err.println("Error: " + error.getMessage()) );
✅ 4. subscribe(onNext, onError, onComplete)
-
Use Case: You want to know when the stream completes successfully.
Flux.range(1, 3) .subscribe( data -> System.out.println("Got: " + data), error -> System.err.println("Error: " + error), () -> System.out.println("Stream completed.") );
✅ 5. subscribe(onNext, onError, onComplete, onSubscribe)
-
Use Case: Full control — including backpressure (you can control how many items to request).
-
The onSubscribe
gives access to the Subscription
.
Flux.range(1, 10) .subscribe( data -> System.out.println("Item: " + data), err -> System.err.println("Error: " + err), () -> System.out.println("Done"), subscription -> { System.out.println("Subscribed!"); subscription.request(5); // Request only 5 items } );
๐ ️ When to Use What?
Scenario
Recommended subscribe
Variant
You don't care about results
subscribe()
You want only data
subscribe(onNext)
You want to handle data + errors
subscribe(onNext, onError)
You want to handle completion
subscribe(onNext, onError, onComplete)
You want fine control (e.g., backpressure)
subscribe(onNext, onError, onComplete, onSubscribe)
๐ block()
vs subscribe()
Feature
subscribe()
block()
Type
Non-blocking
Blocking
Use in
Reactive chains (preferred)
Testing, bridging to sync
Thread
Doesn’t block calling thread
Blocks current thread
๐ฆ Real-World Use Cases
Use Case
Approach
Logging events
subscribe(event -> log.info("Event: {}", event))
REST controller (WebFlux)
Return Mono/Flux
, no need to call subscribe()
manually
Side-effects (e.g., DB insert)
subscribe()
with proper error/complete handling
Integration test (wait result)
Use .block()
Controlled data consumption
Use 4-arg subscribe()
with backpressure
---------------------------------End-----------------
⏱️ elapsed()
— What It Does
Flux<T> → Flux<Tuple2<Long, T>>
The .elapsed()
operator measures the time interval between when a value is requested and when it is emitted. It transforms each item into a Tuple2<Long, T>:
-
Long
— time in milliseconds since the previous item was emitted (or since subscription for the first item).
-
T
— the original value.
๐ Use Case
Use .elapsed()
to:
-
Debug performance or latency issues
-
Profile slow publishers
-
Measure time gaps between emissions
-
Observe delays in streams
Flux.just("A", "B", "C") .delayElements(Duration.ofMillis(300)) .elapsed() .subscribe(tuple -> System.out.println("Elapsed: " + tuple.getT1() + " ms, Value: " + tuple.getT2()));
⛏️ Breakdown of .elapsed()
Internals:
-
Under the hood, it uses System.nanoTime()
to calculate time difference between signals.
-
The first item is timed from the moment of subscription.
-
Works with Flux or Mono (single value).
๐ Real-World Example
Measure API call time:
subscribe(...)
?❌ Harder to test and reason about
✅ Better to use
doOnNext()
for logging and keep subscribe()
clean and purpose-focused.✅ Best Practice: Use doOn...
for Observability, subscribe()
for Execution
This separation makes your pipelines:
-
Easier to read
-
More composable and testable
-
Cleaner for production instrumentation (e.g., Micrometer, tracing)
doOnNext()
calls!Task | Use This |
---|---|
Logging | doOnNext , doOnError |
Metrics / Tracing | doOnNext , doOnComplete |
Side effects (non-intrusive) | doOn... methods |
Business logic / API calls | map , flatMap , concatMap , switchMap |
Terminal consumer | subscribe() |
๐ง 1. contextCapture()
— Bridge for context propagation
๐ What it does:
It captures the Reactor context at the point it’s called and ensures it's available downstream, especially across thread boundaries.
๐ฆ Use Case:
You’ve added some metadata (like requestId
) in the Context
, and want to ensure it's available inside doOnNext
, flatMap
, etc.
-------------End---------------------------
All Together
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import java.time.Duration;
public class ReactiveProductionExample {
public static void main(String[] args) {
Flux.defer(() -> Flux.just("request-payload","request-payload1","request-payload2"))
// 1️⃣ SubscribeOn sets the thread where subscription (upstream) starts
.subscribeOn(Schedulers.boundedElastic())
// 2️⃣ Setup an artificial delay or API call with timeout
.flatMap(data -> simulateExternalApi(data)
.elapsed() // 3️⃣ elapse - for measuring latency
.doOnNext(tuple -> System.out.println("API Time taken: " + tuple.getT1() + "ms"))
.map(tuple -> tuple.getT2()) // Extract the actual data
.timeout(Duration.ofMillis(500)) // 4️⃣ timeout protection
.onErrorResume(error -> fallback(data, error)) // 5️⃣ graceful fallback
)
// 6️⃣ PublishOn switches downstream flow to new thread (e.g., CPU intensive)
.publishOn(Schedulers.parallel())
// 7️⃣ doOn methods for side effects — logging, metrics (not modifying flow)
.doOnNext(result -> System.out.println("[Result Ready]: " + result))
.doOnError(err -> System.err.println("[ERROR]: " + err.getMessage()))
.doOnSubscribe(subscription -> System.out.println("[Subscribed]"))
// .doOnSuccess(r -> System.out.println("[Success]") )
.doOnComplete(() -> System.out.println("[Success]") )
// 8️⃣ contextWrite for logging/tracing IDs
.contextWrite(Context.of("requestId", "REQ-123"))
// 9️⃣ Finally subscribe to trigger the chain
.subscribe(
data -> System.out.println("Final data: " + data),
error -> System.err.println("Terminal Error: " + error),
() -> System.out.println("Done")
);
// Sleep to allow async threads to complete
try { Thread.sleep(2000); } catch (Exception e) {}
}
static Mono<String> simulateExternalApi(String data) {
return Mono.fromCallable(() -> {
Thread.sleep(300); // Simulate delay
return "response-for-" + data;
}).subscribeOn(Schedulers.boundedElastic());
}
static Mono<String> fallback(String input, Throwable error) {
System.err.println("[Fallback triggered for: " + input + "]");
return Mono.just("fallback-for-" + input);
}
}
[Subscribed] API Time taken: 303ms [Result Ready]: response-for-request-payload Final data: response-for-request-payload API Time taken: 303ms API Time taken: 305ms [Result Ready]: response-for-request-payload1 Final data: response-for-request-payload1 [Result Ready]: response-for-request-payload2 Final data: response-for-request-payload2 [Success] Done
Comments
Post a Comment