๐น
1️⃣ subscribeOn – "Decides WHERE the Pipeline Starts"
๐ Definition:
-
subscribeOninfluences the thread where the data source (upstream) (e.g., data generation, API calls) runs. -
It affects the source and everything downstream (until a
publishOnswitches it).
Flux<Integer> flux = Flux.range(1, 3)
.doOnNext(i -> System.out.println("[Generating] " + i + " on " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.boundedElastic()) // Change starting thread
.map(i -> {
System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName());
return i * 10;
});
flux.blockLast();
Output:
[Generating] 1 on boundedElastic-1 [Processing] 1 on boundedElastic-1 [Generating] 2 on boundedElastic-1 [Processing] 2 on boundedElastic-1 [Generating] 3 on boundedElastic-1 [Processing] 3 on boundedElastic-1
๐ข Key Insight:
-
subscribeOnaffects the beginning (source) and flows down. -
subscribeOndoesn't just switch one stage; it affects everything downstream unless overridden bypublishOn.
2️⃣ publishOn – "Switches the Thread at a Certain
Point"
๐ Definition:
-
publishOnswitches the thread context at the point where it appears. - It affects only downstream operators.
Flux<Integer> flux = Flux.range(1, 3)
.doOnNext(i -> System.out.println("[Generating] " + i + " on " + Thread.currentThread().getName()))
.publishOn(Schedulers.parallel()) // Switch thread here
.map(i -> {
System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName());
return i * 10;
});
flux.blockLast();
Output:
[Generating] 1 on main [Generating] 2 on main [Generating] 3 on main [Processing] 1 on parallel-1 [Processing] 2 on parallel-1 [Processing] 3 on parallel-1
Key Insight:
-
publishOnswitches thread only from where it's placed. -
Upstream (before
publishOn) stays on original thread (e.g.,main).
Combining Both: Full Example
Flux<Integer> flux = Flux.range(1, 3)
.doOnNext(i -> System.out.println("[Source] " + i + " on " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.boundedElastic()) // Run source on boundedElastic
.publishOn(Schedulers.parallel()) // Switch context for downstream
.map(i -> {
System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.single()) // Switch context again
.doOnNext(i -> System.out.println("[Final] " + i + " on " + Thread.currentThread().getName()));
flux.blockLast();
Output:
[Source] 1 on boundedElastic-1 [Source] 2 on boundedElastic-1 [Source] 3 on boundedElastic-1 [Processing] 1 on parallel-1 [Processing] 2 on parallel-1 [Processing] 3 on parallel-1 [Final] 10 on single-1 [Final] 20 on single-1 [Final] 30 on single-1
| Feature | subscribeOn |
publishOn |
|---|---|---|
| Thread Affected | Affects source and downstream | Affects only downstream from where used |
| Usage | To control source/initial thread | To switch context mid-pipeline |
| Common Use Case | I/O sources (DB, network calls) | Switching from CPU-bound to I/O or vice versa |
| Placement Impact | Works wherever it is, but affects source | Works where it is, affects downstream |
๐ 1️⃣ What is defer in Reactor?
๐ Definition:
-
Mono.deferorFlux.deferdelays the creation of the actual Publisher (Mono/Flux) until a subscriber subscribes. - Every subscriber gets a new instance of the Publisher, which is created lazily.
๐ This is especially useful when you:
- Want to wrap a method call (like DB access, event publishing, etc.).
- Ensure the call happens only when subscribed (and not during definition of the pipeline).
-
Ensure threading behavior (as
subscribeOnaffects from the point of subscription).
2️⃣ Example: Without defer
public Mono<String> getDataFromDb() {
System.out.println("Calling DB on thread: " + Thread.currentThread().getName());
return Mono.just("Data");
}
Mono<String> result = getDataFromDb()
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName()));
result.subscribe();
Output:
Calling DB on thread: main Got: Data on boundedElastic-1
✅ Observation:
-
getDataFromDb()is called immediately, when definingresult. -
Even though
subscribeOnis used, the method is called onmainthread.
๐ฅ 3️⃣ Example: With defer
Mono<String> result = Mono.defer(() -> {
System.out.println("Calling DB on thread: " + Thread.currentThread().getName());
return Mono.just("Data");
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName()));
result.subscribe();
Output:
Calling DB on thread: boundedElastic-1 Got: Data on boundedElastic-1
Observation:
-
Now,
deferdelays the DB call until subscription. -
Because
subscribeOncontrols the thread where subscription happens, the DB call is made on theboundedElasticthread.
4️⃣ defer + publishOn – What Happens?
Mono<String> result = Mono.defer(() -> {
System.out.println("Calling DB on thread: " + Thread.currentThread().getName());
return Mono.just("Data");
})
.subscribeOn(Schedulers.boundedElastic()) // Controls where subscription & upstream happen
.publishOn(Schedulers.parallel()) // Switch thread for downstream
.map(data -> {
System.out.println("Processing on thread: " + Thread.currentThread().getName());
return data.toUpperCase();
})
.doOnNext(data -> System.out.println("Final: " + data + " on " + Thread.currentThread().getName()));
result.subscribe();
Output:
Calling DB on thread: boundedElastic-1 Processing on thread: parallel-1 Final: DATA on parallel-1
✅ Observation:
-
deferensures DB call is on boundedElastic (because ofsubscribeOn). -
publishOnswitches thread after DB call (for downstream processing)
| Concept | Meaning |
|---|---|
| defer | Wraps a supplier so it's called lazily at subscription time, not pipeline definition. |
| subscribeOn | Controls thread of subscription & upstream (including defer calls). |
| publishOn | Switches thread from that point downstream. |
๐ Schedulers in Reactor: Deep Dive
Schedulers are the backbone of threading in Reactor. They control where and how your reactive pipeline's tasks run.
Let's start by listing common Reactor Schedulers:
Schedulers.immediate()Schedulers.single()Schedulers.parallel()Schedulers.boundedElastic()Schedulers.newSingle(name)Schedulers.fromExecutorService(executor)
๐ฏ 1️⃣ Schedulers.immediate()
๐ Definition:
Runs tasks on the current thread (no thread switch).
๐ Use Case:
- When you don't want threading overhead (for example, when already on a background thread).
- For debugging or very lightweight tasks.
Mono.just("data")
.subscribeOn(Schedulers.immediate())
.doOnNext(d -> System.out.println("Immediate: " + Thread.currentThread().getName()))
.subscribe();
Output:
Immediate: main
๐ฏ 2️⃣ Schedulers.single()
๐ Definition:
A single-threaded scheduler backed by one thread (shared
across all usages).
๐ Use Case:
- Sequential task processing (e.g., writing logs, file writes, or certain transactional operations).
- Avoids concurrency issues.
Mono.just("single-threaded")
.subscribeOn(Schedulers.single())
.doOnNext(d -> System.out.println("Single: " + Thread.currentThread().getName()))
.subscribe();
Output:
Single: reactor-single-1
๐ฏ 3️⃣ Schedulers.parallel()
๐ Definition:
A fixed-size thread pool, typically
based on the number of CPU cores
(Runtime.getRuntime().availableProcessors()).
๐ Use Case:
- CPU-bound tasks (e.g., intensive calculations, transformation logic).
- Leverage multiple cores for parallelism.
Flux.range(1, 5)
.parallel() // Makes parallel flux
.runOn(Schedulers.parallel())
.doOnNext(i -> System.out.println("Parallel: " + i + " on " + Thread.currentThread().getName()))
.sequential()
.subscribe();
Output:
Parallel: 1 on parallel-1 Parallel: 2 on parallel-2 Parallel: 3 on parallel-1 Parallel: 4 on parallel-2 Parallel: 5 on parallel-1
๐ฏ 4️⃣ Schedulers.boundedElastic()
๐ Definition:
An elastic thread pool with limits. Creates threads as needed
(like newCachedThreadPool) but bounded to avoid
resource exhaustion.
๐ Use Case:
-
I/O-bound operations:
- Blocking DB calls (JDBC)
- File reads/writes
- Calling external APIs
- When the number of concurrent threads could be large but controlled.
Mono.fromCallable(() -> {
Thread.sleep(1000); // Simulate blocking call
return "Elastic data";
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(d -> System.out.println("Elastic: " + d + " on " + Thread.currentThread().getName()))
.subscribe();
Output:
Elastic: Elastic data on boundedElastic-1
๐ฏ 5️⃣ Schedulers.newSingle("custom")
๐ Definition:
Creates a new single-threaded Scheduler with a custom thread
name.
๐ Use Case:
- When you need an isolated thread for a specific purpose (e.g., a dedicated logger or a sensitive task).
Scheduler myScheduler = Schedulers.newSingle("my-thread");
Mono.just("custom")
.subscribeOn(myScheduler)
.doOnNext(d -> System.out.println("Custom: " + d + " on " + Thread.currentThread().getName()))
.subscribe();
Output:
Custom: custom on my-thread-1
๐ฏ 6️⃣ Schedulers.fromExecutorService()
๐ Definition:
Wraps a custom ExecutorService (e.g., a Java thread pool)
into a Scheduler.
๐ Use Case:
- When integrating with an existing thread pool (from legacy systems or custom configurations).
๐ป Example:
ExecutorService executor = Executors.newFixedThreadPool(4);
Scheduler customScheduler = Schedulers.fromExecutorService(executor);
Mono.just("custom-executor")
.subscribeOn(customScheduler)
.doOnNext(d -> System.out.println("Executor: " + d + " on " + Thread.currentThread().getName()))
.subscribe();
| Scheduler | Threads | Use Case |
|---|---|---|
immediate |
Current thread | No threading |
single |
1 shared thread | Sequential tasks |
parallel |
CPU cores | CPU-bound parallel tasks |
boundedElastic |
Dynamic, bounded pool | I/O-bound, blocking tasks |
newSingle("name") |
1 new thread | Isolated, named thread |
fromExecutorService |
Custom pool | Integrate legacy thread pools |
๐ฅ map vs flatMap in Reactor: Detailed Breakdown
1️⃣ map()
๐ What is it?
- Synchronous transformation of data.
- It maps (transforms) one value to another in a one-to-one fashion.
- Input type → Output type transformation.
๐ฅ Key Points:
-
Takes a
Function<T, R>: input typeTto output typeR. - Runs in the same thread where it's called (no async switch).
- Great for simple data transformations (e.g., uppercasing a string, adding a number).
Mono<String> mono = Mono.just("hello");
mono.map(s -> s.toUpperCase()) // Transform "hello" → "HELLO"
.subscribe(result -> System.out.println("Result: " + result));
Output:
Result: HELLO
๐ฏ 2️⃣ flatMap()
๐ What is it?
- Asynchronous or nested transformation.
- It maps a value to a Publisher (Mono or Flux), and then flattens the result.
- Useful for chaining async calls (like DB access, web calls).
๐ฅ Key Points:
- Takes a Function<T, Publisher<R>>.
- It flattens nested streams into a single stream.
- Often used for asynchronous workflows where each step depends on the result of the previous one.
๐ป Example:
Mono<String> mono = Mono.just("user123");
// Simulate async DB call
mono.flatMap(id -> Mono.just("User Name for " + id))
.subscribe(result -> System.out.println("Result: " + result));
Output:
Result: User Name for user123
Flattening Example (No Async, No Threading)
Flux<String> source = Flux.just("A", "B", "C");
// flatMap each letter to a Flux of two letters (like "A" → ["A1", "A2"])
Flux<String> flattened = source.flatMap(letter -> Flux.just(letter + "1", letter + "2"));
flattened.subscribe(System.out::println);
What does "flattening" mean in flatMap?
When you call flatMap, it maps each item to a new Publisher (Mono/Flux), and then flattens those inner Publishers into the main stream.
Imagine:
- You start with Mono<T> or Flux<T>.
- Each T maps to a Publisher<R> (Mono or Flux).
- Instead of ending up with nested Publishers (Mono<Mono<R>> or Flux<Flux<R>>), Reactor "flattens" them into Mono<R> or Flux<R>.
Flux<T> --flatMap--> Flux<Publisher<R>> ---flatten---> Flux<R> Mono<T> --flatMap--> Mono<Publisher<R>> ---flatten---> Mono<R>
Async Example Without Explicit subscribeOn
Flux<String> source = Flux.just("A", "B", "C");
// Simulate async operation using delayElements
Flux<String> flattened = source.flatMap(letter ->
Flux.just(letter + "1", letter + "2").delayElements(Duration.ofMillis(100))
);
flattened.subscribe(result ->
System.out.println("Received: " + result + " on " + Thread.currentThread().getName())
);
What's Happening?
- The outer Flux is processed synchronously.
-
The inner Flux has
delayElements, which schedules its elements on the default timer scheduler in Reactor (async). -
This introduces asynchrony, even though we didn't call
subscribeOn.
Output:
Received: A1 on reactor-timer_1 Received: A2 on reactor-timer_1 Received: B1 on reactor-timer_1 Received: B2 on reactor-timer_1 Received: C1 on reactor-timer_1 Received: C2 on reactor-timer_1
Why Async Without Explicit Threading?
-
Reactor supports asynchronous operators (like
delayElements,fromFuture, etc.) that internally use Schedulers. -
Even if you don't specify
subscribeOn, these operators introduce scheduling (e.g., Reactor'stimerscheduler fordelayElements). - So, flatMap's inner Publishers can be async if the operation inside is async.
| Feature | map | flatMap |
|---|---|---|
| Sync/Async | Synchronous | Asynchronous or nested |
| Transformation | One-to-one | One-to-many (flattens inner Publisher) |
| Threading | Same thread | Depends on inner publisher's thread |
| Use case | Simple transformation | Chaining async/complex calls |
๐ 1️⃣ What is timeout in Reactor?
timeout is an operator used in
Project Reactor (Mono/Flux) to specify the
maximum time you're willing to wait for an item from the
upstream Publisher.
- If the upstream doesn't emit within the specified duration, it terminates the sequence with a TimeoutException (or can optionally switch to a fallback sequence).
๐ Analogy:
Imagine you're waiting for a pizza delivery. You say:
"I'll wait 30 minutes. If it's not here by then, I'll order from a different
place."
2️⃣ Basic Usage of timeout
Example 1: Simple Timeout
Flux<String> slowFlux = Flux.just("A", "B", "C")
.delayElements(Duration.ofSeconds(2));
slowFlux
.timeout(Duration.ofSeconds(1)) // If any element takes >1s, error
.subscribe(
item -> System.out.println("Received: " + item),
err -> System.err.println("Error: " + err)
);
Explanation:
delayElements(2s)delays each item by 2 seconds.timeout(1s)expects each element within 1 second.- If any item is delayed beyond 1s, it triggers TimeoutException.
3️⃣ Timeout With Fallback
slowFlux
.timeout(Duration.ofSeconds(1), Flux.just("Fallback", "Default"))
.subscribe(
item -> System.out.println("Received: " + item),
err -> System.err.println("Error: " + err)
);
๐ Now, if a timeout occurs:
-
Instead of error, the stream switches to
Flux.just("Fallback", "Default").
How Does It Work Under the Hood?
-
timeoutinternally sets up a timer (Reactor uses aScheduler, usuallyparallelortimer). -
If an item does not arrive in time, it:
- Either emits
TimeoutException(default behavior). - Or cancels the upstream and switches to a fallback Publisher (if provided).
- Either emits
๐ Threading: The timeout timer runs on a separate scheduler (usually timer or parallel), not blocking the main thread.
| Operator | Description |
|---|---|
timeout(Duration) |
Waits max Duration for next item, errors on timeout. |
timeout(Duration, Publisher fallback) |
On timeout, switch to fallback Publisher instead of error.
|
timeout(Publisher firstTimeout) |
Timeout is determined by emissions from firstTimeout.
|
timeout(Publisher first, Function) |
Dynamically control timeouts per item. |
Example: Dynamic Timeout Per Item
Flux<String> items = Flux.just("A", "B", "C");
// Each item has a custom timeout: "A" gets 1s, "B" gets 2s, etc.
items
.timeout(
item -> {
if (item.equals("A")) return Mono.delay(Duration.ofSeconds(1));
else return Mono.delay(Duration.ofSeconds(2));
}
)
.subscribe(
System.out::println,
err -> System.err.println("Error: " + err)
);
⚠️ 6️⃣ Common Pitfalls
1️⃣ Timeout Is Per Item:
Timeout applies to each item individually. So even if the
stream emits items regularly, any item that takes too long triggers timeout.
2️⃣ Scheduler Overhead:
Each timeout sets up a timer. For very high-frequency streams,
too many timers can cause overhead.
3️⃣ Fallback Must Be Reactive:
If you use fallback, it must be a Publisher, not a static
value.
| Concept | Key Points |
|---|---|
| timeout | Sets max wait time for each item. Emits error or switches fallback. |
| Fallback | Optional Publisher to continue on timeout instead of error. |
| Dynamic timeout | Custom timeout per item using Function. |
| Threading | Uses internal Scheduler (e.g., timer) for managing timeout logic. |
| Use Cases | DB queries, WebClient calls, external systems with variable response times. |
๐ฅ Key onError Methods:
| Method | Purpose | Example |
|---|---|---|
| onErrorReturn(T fallback) | Return a fallback value when an error occurs | .onErrorReturn("Fallback value") |
| onErrorResume(Function) | Switch to a fallback Publisher (Mono/Flux) when an error occurs | .onErrorResume(e -> Flux.just("Fallback")) |
| onErrorMap(Function) | Transform the error into another error (e.g., wrapping) | .onErrorMap(e -> new CustomException(e)) |
| onErrorContinue(BiConsumer) | Log or skip the error and continue with the next item | .onErrorContinue((e, item) -> log.warn(...)) |
| doOnError(Consumer) | Side-effect: perform an action on error but don't recover | .doOnError(e -> log.error("Error: " + e)) |
⚠️ onErrorReturn vs onErrorResume
-
onErrorReturn: emits one fallback value and terminates. -
onErrorResume: switches to a new sequence (Flux/Mono) and continues.
2️⃣ subscribe() Overloads in Reactor
๐ subscribe() in Project Reactor
In Reactor,
nothing happens until you call subscribe(). It is a terminal operation that starts the data flow in
the reactive pipeline (aka subscription to the Publisher).
subscribe();
subscribe(Consumer<? super T> consumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
✅ 1. subscribe()
- Use Case: Fire-and-forget (e.g., logging, debugging, testing).
- No way to observe emitted values or errors.
Flux.just("A", "B", "C")
.map(String::toLowerCase)
.subscribe();
✅ 2. subscribe(Consumer<T> onNext)
- Use Case: You care about the data but not errors or completion.
- Best for simple flows like logging results.
Mono.just("Hello")
.subscribe(data -> System.out.println("Received: " + data));
✅ 3. subscribe(onNext, onError)
- Use Case: Want to handle data and errors.
- Recommended for robust code.
Mono.just("A")
.map(s -> { throw new RuntimeException("Boom!"); })
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.err.println("Error: " + error.getMessage())
);
✅ 4. subscribe(onNext, onError, onComplete)
- Use Case: You want to know when the stream completes successfully.
Flux.range(1, 3)
.subscribe(
data -> System.out.println("Got: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Stream completed.")
);
✅ 5. subscribe(onNext, onError, onComplete, onSubscribe)
- Use Case: Full control — including backpressure (you can control how many items to request).
-
The
onSubscribegives access to theSubscription.
Flux.range(1, 10)
.subscribe(
data -> System.out.println("Item: " + data),
err -> System.err.println("Error: " + err),
() -> System.out.println("Done"),
subscription -> {
System.out.println("Subscribed!");
subscription.request(5); // Request only 5 items
}
);
๐ ️ When to Use What?
| Scenario | Recommended subscribe Variant |
|---|---|
| You don't care about results | subscribe() |
| You want only data | subscribe(onNext) |
| You want to handle data + errors | subscribe(onNext, onError) |
| You want to handle completion | subscribe(onNext, onError, onComplete) |
| You want fine control (e.g., backpressure) | subscribe(onNext, onError, onComplete, onSubscribe) |
๐ block() vs subscribe()
| Feature | subscribe() |
block() |
|---|---|---|
| Type | Non-blocking | Blocking |
| Use in | Reactive chains (preferred) | Testing, bridging to sync |
| Thread | Doesn't block calling thread | Blocks current thread |
๐ฆ Real-World Use Cases
| Use Case | Approach |
|---|---|
| Logging events | subscribe(event -> log.info("Event: {}", event)) |
| REST controller (WebFlux) |
Return Mono/Flux, no need to call
subscribe() manually
|
| Side-effects (e.g., DB insert) | subscribe() with proper error/complete handling |
| Integration test (wait result) | Use .block() |
| Controlled data consumption | Use 4-arg subscribe() with backpressure |
⏱️ elapsed() — What It Does
The .elapsed() operator
measures the time interval between when a value is requested
and when it is emitted. It transforms each item into a
Tuple2<Long, T>:
-
Long— time in milliseconds since the previous item was emitted (or since subscription for the first item). T— the original value.
๐ Use Case
Use .elapsed() to:
- Debug performance or latency issues
- Profile slow publishers
- Measure time gaps between emissions
- Observe delays in streams
Flux.just("A", "B", "C")
.delayElements(Duration.ofMillis(300))
.elapsed()
.subscribe(tuple -> System.out.println("Elapsed: " + tuple.getT1() + " ms, Value: " + tuple.getT2()));
⛏️ Breakdown of .elapsed() Internals:
-
Under the hood, it uses
System.nanoTime()to calculate time difference between signals. - The first item is timed from the moment of subscription.
- Works with Flux or Mono (single value).
๐ Real-World Example
Measure API call time:
Mono.fromCallable(() -> makeNetworkCall())
.elapsed()
.subscribe(tuple -> {
System.out.println("API call took: " + tuple.getT1() + " ms, Response: " + tuple.getT2());
});
Caveats
| Limitation | Explanation |
|---|---|
| Time is per item | Measures between item emissions, not total processing time. |
| Doesn't work well without delays |
If items are emitted too fast (e.g. Flux.range(1, 5)),
elapsed will be 0 ms for all.
|
| Can add minor overhead | Should not be used in high-performance paths unless needed. |
❓ Why do we need doOnNext, doOnError,
doOnComplete if we already have subscribe()?
๐ค Quick Answer:
Because subscribe() is the terminal operation —
it starts the reactive pipeline.
But the doOn... methods are
non-terminal side-effect hooks used for
logging, metrics, debugging, auditing, tracing, etc., without
changing the stream.
๐ง Conceptual Difference
| Feature | doOn... (e.g., doOnNext) |
subscribe(...) |
|---|---|---|
| Purpose | Add side effects while stream flows | Consume the stream and trigger execution |
| Type | Non-terminal (intermediate operator) | Terminal operator |
| Returns | Still a Mono or Flux |
Returns a Disposable |
| Use case | Logging, tracing, metrics, hooks | Actual consumption of data |
✅ Example: Difference in use
Flux.just("A", "B", "C")
.doOnNext(item -> System.out.println("doOnNext: " + item)) // side effect
.doOnComplete(() -> System.out.println("doOnComplete")) // side effect
.subscribe(item -> System.out.println("subscribe: " + item)); // data consumption
Output:
doOnNext: A subscribe: A doOnNext: B subscribe: B doOnNext: C subscribe: C doOnComplete
Common doOn... Methods and Use Cases
| Method | Triggered When... | Typical Use |
|---|---|---|
doOnSubscribe() |
Subscription begins | Log subscription, add tracing context |
doOnNext() |
Item is emitted | Logging, debug, metrics |
doOnComplete() |
All items successfully emitted | Notify, log, clean-up |
doOnError() |
Error is emitted | Log error, metrics, retry logic hooks |
doOnCancel() |
Subscription is cancelled | Clean-up, audit |
doOnTerminate() |
Stream terminates (complete or error) | Unified logging or cleanup |
doFinally() |
Always called last (complete/error/cancel) | Final clean-up, like finally in Java |
❗ Why not just do everything in subscribe(...)?
.subscribe(
data -> { log.debug("Got item {}", data); sendToDB(data); },
error -> { log.error("Error", error); notifyMonitoring(); },
() -> log.info("Done")
);
✅ You can — but it mixes
side-effects with business logic
❌ Harder to test and reason about
✅ Better to use doOnNext() for logging and keep
subscribe() clean and purpose-focused.
✅ Best Practice: Use doOn... for Observability,
subscribe() for Execution
This separation makes your pipelines:
- Easier to read
- More composable and testable
- Cleaner for production instrumentation (e.g., Micrometer, tracing)
๐งช Advanced Tip: You can chain multiple doOnNext() calls!
flux
.doOnNext(item -> log.debug("Stage 1: {}", item))
.map(this::transform)
.doOnNext(item -> log.debug("Stage 2: {}", item))
Each one sees the stream at that point in the chain, making them perfect for stage-level inspection
✅ Rule of Thumb:
| Task | Use This |
|---|---|
| Logging | doOnNext, doOnError |
| Metrics / Tracing | doOnNext, doOnComplete |
| Side effects (non-intrusive) | doOn... methods |
| Business logic / API calls |
map, flatMap, concatMap,
switchMap
|
| Terminal consumer | subscribe() |
๐ Example with both logging and API call:
Flux<String> ids = Flux.just("X", "Y", "Z");
ids.doOnNext(id -> log.debug("Processing ID: {}", id)) // logging side effect
.flatMap(id -> callApi(id)) // API call
.map(response -> transform(response)) // transformation
.doOnNext(result -> log.debug("Transformed: {}", result)) // log result
.subscribe(finalResult -> sendToNextService(finalResult));
๐ง 1. contextCapture() — Bridge for context propagation
๐ What it does:
It captures the Reactor context at the point it's called and ensures it's available downstream, especially across thread boundaries.
๐ฆ Use Case:
You've added some metadata (like requestId) in the
Context, and want to ensure it's available inside
doOnNext, flatMap, etc.
All Together
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import java.time.Duration;
public class ReactiveProductionExample {
public static void main(String[] args) {
Flux.defer(() -> Flux.just("request-payload","request-payload1","request-payload2"))
// 1️⃣ SubscribeOn sets the thread where subscription (upstream) starts
.subscribeOn(Schedulers.boundedElastic())
// 2️⃣ Setup an artificial delay or API call with timeout
.flatMap(data -> simulateExternalApi(data)
.elapsed() // 3️⃣ elapse - for measuring latency
.doOnNext(tuple -> System.out.println("API Time taken: " + tuple.getT1() + "ms"))
.map(tuple -> tuple.getT2()) // Extract the actual data
.timeout(Duration.ofMillis(500)) // 4️⃣ timeout protection
.onErrorResume(error -> fallback(data, error)) // 5️⃣ graceful fallback
)
// 6️⃣ PublishOn switches downstream flow to new thread (e.g., CPU intensive)
.publishOn(Schedulers.parallel())
// 7️⃣ doOn methods for side effects — logging, metrics (not modifying flow)
.doOnNext(result -> System.out.println("[Result Ready]: " + result))
.doOnError(err -> System.err.println("[ERROR]: " + err.getMessage()))
.doOnSubscribe(subscription -> System.out.println("[Subscribed]"))
.doOnComplete(() -> System.out.println("[Success]"))
// 8️⃣ contextWrite for logging/tracing IDs
.contextWrite(Context.of("requestId", "REQ-123"))
// 9️⃣ Finally subscribe to trigger the chain
.subscribe(
data -> System.out.println("Final data: " + data),
error -> System.err.println("Terminal Error: " + error),
() -> System.out.println("Done")
);
// Sleep to allow async threads to complete
try { Thread.sleep(2000); } catch (Exception e) {}
}
static Mono<String> simulateExternalApi(String data) {
return Mono.fromCallable(() -> {
Thread.sleep(300); // Simulate delay
return "response-for-" + data;
}).subscribeOn(Schedulers.boundedElastic());
}
static Mono<String> fallback(String input, Throwable error) {
System.err.println("[Fallback triggered for: " + input + "]");
return Mono.just("fallback-for-" + input);
}
}
Output:
[Subscribed] API Time taken: 303ms [Result Ready]: response-for-request-payload Final data: response-for-request-payload API Time taken: 303ms API Time taken: 305ms [Result Ready]: response-for-request-payload1 Final data: response-for-request-payload1 [Result Ready]: response-for-request-payload2 Final data: response-for-request-payload2 [Success] Done
Comments
Post a Comment