๐น 1️⃣ subscribeOn
– "Decides WHERE the Pipeline Starts"
๐ Definition:
subscribeOn
influences the thread where the data source (upstream) (e.g., data generation, API calls) runs.- It affects the source and everything downstream (until a
publishOn
switches it).
Flux<Integer> flux = Flux.range(1, 3) .doOnNext(i -> System.out.println("[Generating] " + i + " on " + Thread.currentThread().getName())) .subscribeOn(Schedulers.boundedElastic()) // Change starting thread .map(i -> { System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName()); return i * 10; }); flux.blockLast();
Output:
[Generating] 1 on boundedElastic-1 [Processing] 1 on boundedElastic-1 [Generating] 2 on boundedElastic-1 [Processing] 2 on boundedElastic-1 [Generating] 3 on boundedElastic-1 [Processing] 3 on boundedElastic-1
๐ข Key Insight:
subscribeOn
affects the beginning (source) and flows down.subscribeOn
doesn't just switch one stage; it affects everything downstream unless overridden bypublishOn
.
2️⃣ publishOn
– "Switches the Thread at a Certain Point"
๐ Definition:
publishOn
switches the thread context at the point where it appears.- It affects only downstream operators.
Flux<Integer> flux = Flux.range(1, 3) .doOnNext(i -> System.out.println("[Generating] " + i + " on " + Thread.currentThread().getName())) .publishOn(Schedulers.parallel()) // Switch thread here .map(i -> { System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName()); return i * 10; }); flux.blockLast();
Output:
[Generating] 1 on main [Generating] 2 on main [Generating] 3 on main [Processing] 1 on parallel-1 [Processing] 2 on parallel-1 [Processing] 3 on parallel-1
Key Insight:
publishOn
switches thread only from where it's placed.- Upstream (before
publishOn
) stays on original thread (e.g.,main
).
Combining Both: Full Example
Flux<Integer> flux = Flux.range(1, 3) .doOnNext(i -> System.out.println("[Source] " + i + " on " + Thread.currentThread().getName())) .subscribeOn(Schedulers.boundedElastic()) // Run source on boundedElastic .publishOn(Schedulers.parallel()) // Switch context for downstream .map(i -> { System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName()); return i * 10; }) .publishOn(Schedulers.single()) // Switch context again .doOnNext(i -> System.out.println("[Final] " + i + " on " + Thread.currentThread().getName())); flux.blockLast();
Output:
[Source] 1 on boundedElastic-1 [Source] 2 on boundedElastic-1 [Source] 3 on boundedElastic-1 [Processing] 1 on parallel-1 [Processing] 2 on parallel-1 [Processing] 3 on parallel-1 [Final] 10 on single-1 [Final] 20 on single-1 [Final] 30 on single-1
Feature | subscribeOn |
publishOn |
---|---|---|
Thread Affected | Affects source and downstream | Affects only downstream from where used |
Usage | To control source/initial thread | To switch context mid-pipeline |
Common Use Case | I/O sources (DB, network calls) | Switching from CPU-bound to I/O or vice versa |
Placement Impact | Works wherever it is, but affects source | Works where it is, affects downstream |
๐ 1️⃣ What is defer
in Reactor?
๐ Definition:
Mono.defer
orFlux.defer
delays the creation of the actual Publisher (Mono/Flux) until a subscriber subscribes.- Every subscriber gets a new instance of the Publisher, which is created lazily.
๐ This is especially useful when you:
- Want to wrap a method call (like DB access, event publishing, etc.).
- Ensure the call happens only when subscribed (and not during definition of the pipeline).
- Ensure threading behavior (as
subscribeOn
affects from the point of subscription).
2️⃣ Example: Without defer
public Mono<String> getDataFromDb() { System.out.println("Calling DB on thread: " + Thread.currentThread().getName()); return Mono.just("Data"); } Mono<String> result = getDataFromDb() .subscribeOn(Schedulers.boundedElastic()) .doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName())); result.subscribe();
Output:
Calling DB on thread: main Got: Data on boundedElastic-1
✅ Observation:
getDataFromDb()
is called immediately, when definingresult
.- Even though
subscribeOn
is used, the method is called onmain
thread.
๐ฅ 3️⃣ Example: With defer
Mono<String> result = Mono.defer(() -> { System.out.println("Calling DB on thread: " + Thread.currentThread().getName()); return Mono.just("Data"); }) .subscribeOn(Schedulers.boundedElastic()) .doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName())); result.subscribe();
Output:
Calling DB on thread: boundedElastic-1 Got: Data on boundedElastic-1
Observation:
- Now,
defer
delays the DB call until subscription. - Because
subscribeOn
controls the thread where subscription happens, the DB call is made on theboundedElastic
thread.
4️⃣ defer
+ publishOn
– What Happens?
Mono<String> result = Mono.defer(() -> { System.out.println("Calling DB on thread: " + Thread.currentThread().getName()); return Mono.just("Data"); }) .subscribeOn(Schedulers.boundedElastic()) // Controls where subscription & upstream happen .publishOn(Schedulers.parallel()) // Switch thread for downstream .map(data -> { System.out.println("Processing on thread: " + Thread.currentThread().getName()); return data.toUpperCase(); }) .doOnNext(data -> System.out.println("Final: " + data + " on " + Thread.currentThread().getName())); result.subscribe();
Output:
Calling DB on thread: boundedElastic-1 Processing on thread: parallel-1 Final: DATA on parallel-1
✅ Observation:
defer
ensures DB call is on boundedElastic (because ofsubscribeOn
).publishOn
switches thread after DB call (for downstream processing)
Concept | Meaning |
---|---|
defer | Wraps a supplier so it's called lazily at subscription time, not pipeline definition. |
subscribeOn | Controls thread of subscription & upstream (including defer calls). |
publishOn | Switches thread from that point downstream. |
๐ Schedulers in Reactor: Deep Dive
Schedulers are the backbone of threading in Reactor. They control where and how your reactive pipeline's tasks run.
Let's start by listing common Reactor Schedulers:
Schedulers.immediate()
Schedulers.single()
Schedulers.parallel()
Schedulers.boundedElastic()
Schedulers.newSingle(name)
Schedulers.fromExecutorService(executor)
๐ฏ 1️⃣ Schedulers.immediate()
๐ Definition:
Runs tasks on the current thread (no thread switch).
๐ Use Case:
- When you don't want threading overhead (for example, when already on a background thread).
- For debugging or very lightweight tasks.
Mono.just("data") .subscribeOn(Schedulers.immediate()) .doOnNext(d -> System.out.println("Immediate: " + Thread.currentThread().getName())) .subscribe();
Output:
Immediate: main
๐ฏ 2️⃣ Schedulers.single()
๐ Definition:
A single-threaded scheduler backed by one thread (shared across all usages).
๐ Use Case:
- Sequential task processing (e.g., writing logs, file writes, or certain transactional operations).
- Avoids concurrency issues.
Mono.just("single-threaded") .subscribeOn(Schedulers.single()) .doOnNext(d -> System.out.println("Single: " + Thread.currentThread().getName())) .subscribe();
Output:
Single: reactor-single-1
๐ฏ 3️⃣ Schedulers.parallel()
๐ Definition:
A fixed-size thread pool, typically based on the number of CPU cores (Runtime.getRuntime().availableProcessors()
).
๐ Use Case:
- CPU-bound tasks (e.g., intensive calculations, transformation logic).
- Leverage multiple cores for parallelism.
Flux.range(1, 5) .parallel() // Makes parallel flux .runOn(Schedulers.parallel()) .doOnNext(i -> System.out.println("Parallel: " + i + " on " + Thread.currentThread().getName())) .sequential() .subscribe();
Output:
Parallel: 1 on parallel-1 Parallel: 2 on parallel-2 Parallel: 3 on parallel-1 Parallel: 4 on parallel-2 Parallel: 5 on parallel-1
๐ฏ 4️⃣ Schedulers.boundedElastic()
๐ Definition:
An elastic thread pool with limits. Creates threads as needed (like newCachedThreadPool
) but bounded to avoid resource exhaustion.
๐ Use Case:
- I/O-bound operations:
- Blocking DB calls (JDBC)
- File reads/writes
- Calling external APIs
- When the number of concurrent threads could be large but controlled.
Mono.fromCallable(() -> { Thread.sleep(1000); // Simulate blocking call return "Elastic data"; }) .subscribeOn(Schedulers.boundedElastic()) .doOnNext(d -> System.out.println("Elastic: " + d + " on " + Thread.currentThread().getName())) .subscribe();
Output:
Elastic: Elastic data on boundedElastic-1
๐ฏ 5️⃣ Schedulers.newSingle("custom")
๐ Definition:
Creates a new single-threaded Scheduler with a custom thread name.
๐ Use Case:
- When you need an isolated thread for a specific purpose (e.g., a dedicated logger or a sensitive task).
Scheduler myScheduler = Schedulers.newSingle("my-thread"); Mono.just("custom") .subscribeOn(myScheduler) .doOnNext(d -> System.out.println("Custom: " + d + " on " + Thread.currentThread().getName())) .subscribe();
Output:
Custom: custom on my-thread-1
๐ฏ 6️⃣ Schedulers.fromExecutorService()
๐ Definition:
Wraps a custom ExecutorService (e.g., a Java thread pool) into a Scheduler.
๐ Use Case:
- When integrating with an existing thread pool (from legacy systems or custom configurations).
๐ป Example:
ExecutorService executor = Executors.newFixedThreadPool(4); Scheduler customScheduler = Schedulers.fromExecutorService(executor); Mono.just("custom-executor") .subscribeOn(customScheduler) .doOnNext(d -> System.out.println("Executor: " + d + " on " + Thread.currentThread().getName())) .subscribe();
Scheduler | Threads | Use Case |
---|---|---|
immediate |
Current thread | No threading |
single |
1 shared thread | Sequential tasks |
parallel |
CPU cores | CPU-bound parallel tasks |
boundedElastic |
Dynamic, bounded pool | I/O-bound, blocking tasks |
newSingle("name") |
1 new thread | Isolated, named thread |
fromExecutorService |
Custom pool | Integrate legacy thread pools |
๐ฅ map vs flatMap in Reactor: Detailed Breakdown
1️⃣ map()
๐ What is it?
- Synchronous transformation of data.
- It maps (transforms) one value to another in a one-to-one fashion.
- Input type → Output type transformation.
๐ฅ Key Points:
- Takes a
Function<T, R>
: input typeT
to output typeR
. - Runs in the same thread where it's called (no async switch).
- Great for simple data transformations (e.g., uppercasing a string, adding a number).
Mono<String> mono = Mono.just("hello"); mono.map(s -> s.toUpperCase()) // Transform "hello" → "HELLO" .subscribe(result -> System.out.println("Result: " + result));
Output:
Result: HELLO
๐ฏ 2️⃣ flatMap()
๐ What is it?
- Asynchronous or nested transformation.
- It maps a value to a Publisher (Mono or Flux), and then flattens the result.
- Useful for chaining async calls (like DB access, web calls).
๐ฅ Key Points:
- Takes a Function<T, Publisher<R>>.
- It flattens nested streams into a single stream.
- Often used for asynchronous workflows where each step depends on the result of the previous one.
๐ป Example:
Mono<String> mono = Mono.just("user123"); // Simulate async DB call mono.flatMap(id -> Mono.just("User Name for " + id)) .subscribe(result -> System.out.println("Result: " + result));
Output:
Result: User Name for user123
Flattening Example (No Async, No Threading)
Flux<String> source = Flux.just("A", "B", "C"); // flatMap each letter to a Flux of two letters (like "A" → ["A1", "A2"]) Flux<String> flattened = source.flatMap(letter -> Flux.just(letter + "1", letter + "2")); flattened.subscribe(System.out::println);
What does "flattening" mean in flatMap?
When you call flatMap, it maps each item to a new Publisher (Mono/Flux), and then flattens those inner Publishers into the main stream.
Imagine:
- You start with Mono<T> or Flux<T>.
- Each T maps to a Publisher<R> (Mono or Flux).
- Instead of ending up with nested Publishers (Mono<Mono<R>> or Flux<Flux<R>>), Reactor "flattens" them into Mono<R> or Flux<R>.
Flux<T> --flatMap--> Flux<Publisher<R>> ---flatten---> Flux<R> Mono<T> --flatMap--> Mono<Publisher<R>> ---flatten---> Mono<R>
Async Example Without Explicit subscribeOn
Flux<String> source = Flux.just("A", "B", "C"); // Simulate async operation using delayElements Flux<String> flattened = source.flatMap(letter -> Flux.just(letter + "1", letter + "2").delayElements(Duration.ofMillis(100)) ); flattened.subscribe(result -> System.out.println("Received: " + result + " on " + Thread.currentThread().getName()) );
What's Happening?
- The outer Flux is processed synchronously.
- The inner Flux has
delayElements
, which schedules its elements on the default timer scheduler in Reactor (async). - This introduces asynchrony, even though we didn't call
subscribeOn
.
Output:
Received: A1 on reactor-timer_1 Received: A2 on reactor-timer_1 Received: B1 on reactor-timer_1 Received: B2 on reactor-timer_1 Received: C1 on reactor-timer_1 Received: C2 on reactor-timer_1
Why Async Without Explicit Threading?
- Reactor supports asynchronous operators (like
delayElements
,fromFuture
, etc.) that internally use Schedulers. - Even if you don't specify
subscribeOn
, these operators introduce scheduling (e.g., Reactor'stimer
scheduler fordelayElements
). - So, flatMap's inner Publishers can be async if the operation inside is async.
Feature | map | flatMap |
---|---|---|
Sync/Async | Synchronous | Asynchronous or nested |
Transformation | One-to-one | One-to-many (flattens inner Publisher) |
Threading | Same thread | Depends on inner publisher's thread |
Use case | Simple transformation | Chaining async/complex calls |
๐ 1️⃣ What is timeout
in Reactor?
timeout
is an operator used in Project Reactor (Mono/Flux) to specify the maximum time you're willing to wait for an item from the upstream Publisher.
- If the upstream doesn't emit within the specified duration, it terminates the sequence with a TimeoutException (or can optionally switch to a fallback sequence).
๐ Analogy:
Imagine you're waiting for a pizza delivery. You say:
"I'll wait 30 minutes. If it's not here by then, I'll order from a different place."
2️⃣ Basic Usage of timeout
Example 1: Simple Timeout
Flux<String> slowFlux = Flux.just("A", "B", "C") .delayElements(Duration.ofSeconds(2)); slowFlux .timeout(Duration.ofSeconds(1)) // If any element takes >1s, error .subscribe( item -> System.out.println("Received: " + item), err -> System.err.println("Error: " + err) );
Explanation:
delayElements(2s)
delays each item by 2 seconds.timeout(1s)
expects each element within 1 second.- If any item is delayed beyond 1s, it triggers TimeoutException.
3️⃣ Timeout With Fallback
slowFlux .timeout(Duration.ofSeconds(1), Flux.just("Fallback", "Default")) .subscribe( item -> System.out.println("Received: " + item), err -> System.err.println("Error: " + err) );
๐ Now, if a timeout occurs:
- Instead of error, the stream switches to
Flux.just("Fallback", "Default")
.
How Does It Work Under the Hood?
timeout
internally sets up a timer (Reactor uses aScheduler
, usuallyparallel
ortimer
).- If an item does not arrive in time, it:
- Either emits
TimeoutException
(default behavior). - Or cancels the upstream and switches to a fallback Publisher (if provided).
- Either emits
๐ Threading: The timeout timer runs on a separate scheduler (usually timer or parallel), not blocking the main thread.
Operator | Description |
---|---|
timeout(Duration) |
Waits max Duration for next item, errors on timeout. |
timeout(Duration, Publisher fallback) |
On timeout, switch to fallback Publisher instead of error. |
timeout(Publisher firstTimeout) |
Timeout is determined by emissions from firstTimeout . |
timeout(Publisher first, Function) |
Dynamically control timeouts per item. |
Example: Dynamic Timeout Per Item
Flux<String> items = Flux.just("A", "B", "C"); // Each item has a custom timeout: "A" gets 1s, "B" gets 2s, etc. items .timeout( item -> { if (item.equals("A")) return Mono.delay(Duration.ofSeconds(1)); else return Mono.delay(Duration.ofSeconds(2)); } ) .subscribe( System.out::println, err -> System.err.println("Error: " + err) );
⚠️ 6️⃣ Common Pitfalls
1️⃣ Timeout Is Per Item:
Timeout applies to each item individually. So even if the stream emits items regularly, any item that takes too long triggers timeout.
2️⃣ Scheduler Overhead:
Each timeout
sets up a timer. For very high-frequency streams, too many timers can cause overhead.
3️⃣ Fallback Must Be Reactive:
If you use fallback, it must be a Publisher, not a static value.
Concept | Key Points |
---|---|
timeout | Sets max wait time for each item. Emits error or switches fallback. |
Fallback | Optional Publisher to continue on timeout instead of error. |
Dynamic timeout | Custom timeout per item using Function . |
Threading | Uses internal Scheduler (e.g., timer) for managing timeout logic. |
Use Cases | DB queries, WebClient calls, external systems with variable response times. |
๐ฅ Key onError Methods:
Method | Purpose | Example |
---|---|---|
onErrorReturn(T fallback) | Return a fallback value when an error occurs | .onErrorReturn("Fallback value") |
onErrorResume(Function) | Switch to a fallback Publisher (Mono/Flux) when an error occurs | .onErrorResume(e -> Flux.just("Fallback")) |
onErrorMap(Function) | Transform the error into another error (e.g., wrapping) | .onErrorMap(e -> new CustomException(e)) |
onErrorContinue(BiConsumer) | Log or skip the error and continue with the next item | .onErrorContinue((e, item) -> log.warn(...)) |
doOnError(Consumer) | Side-effect: perform an action on error but don't recover | .doOnError(e -> log.error("Error: " + e)) |
⚠️ onErrorReturn
vs onErrorResume
onErrorReturn
: emits one fallback value and terminates.onErrorResume
: switches to a new sequence (Flux/Mono) and continues.
2️⃣ subscribe()
Overloads in Reactor
๐ subscribe()
in Project Reactor
In Reactor, nothing happens until you call subscribe()
. It is a terminal operation that starts the data flow in the reactive pipeline (aka subscription to the Publisher).
subscribe(); subscribe(Consumer<? super T> consumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
✅ 1. subscribe()
- Use Case: Fire-and-forget (e.g., logging, debugging, testing).
- No way to observe emitted values or errors.
Flux.just("A", "B", "C") .map(String::toLowerCase) .subscribe();
✅ 2. subscribe(Consumer<T> onNext)
- Use Case: You care about the data but not errors or completion.
- Best for simple flows like logging results.
Mono.just("Hello") .subscribe(data -> System.out.println("Received: " + data));
✅ 3. subscribe(onNext, onError)
- Use Case: Want to handle data and errors.
- Recommended for robust code.
Mono.just("A") .map(s -> { throw new RuntimeException("Boom!"); }) .subscribe( data -> System.out.println("Received: " + data), error -> System.err.println("Error: " + error.getMessage()) );
✅ 4. subscribe(onNext, onError, onComplete)
- Use Case: You want to know when the stream completes successfully.
Flux.range(1, 3) .subscribe( data -> System.out.println("Got: " + data), error -> System.err.println("Error: " + error), () -> System.out.println("Stream completed.") );
✅ 5. subscribe(onNext, onError, onComplete, onSubscribe)
- Use Case: Full control — including backpressure (you can control how many items to request).
- The
onSubscribe
gives access to theSubscription
.
Flux.range(1, 10) .subscribe( data -> System.out.println("Item: " + data), err -> System.err.println("Error: " + err), () -> System.out.println("Done"), subscription -> { System.out.println("Subscribed!"); subscription.request(5); // Request only 5 items } );
๐ ️ When to Use What?
Scenario | Recommended subscribe Variant |
---|---|
You don't care about results | subscribe() |
You want only data | subscribe(onNext) |
You want to handle data + errors | subscribe(onNext, onError) |
You want to handle completion | subscribe(onNext, onError, onComplete) |
You want fine control (e.g., backpressure) | subscribe(onNext, onError, onComplete, onSubscribe) |
๐ block()
vs subscribe()
Feature | subscribe() |
block() |
---|---|---|
Type | Non-blocking | Blocking |
Use in | Reactive chains (preferred) | Testing, bridging to sync |
Thread | Doesn't block calling thread | Blocks current thread |
๐ฆ Real-World Use Cases
Use Case | Approach |
---|---|
Logging events | subscribe(event -> log.info("Event: {}", event)) |
REST controller (WebFlux) | Return Mono/Flux , no need to call subscribe() manually |
Side-effects (e.g., DB insert) | subscribe() with proper error/complete handling |
Integration test (wait result) | Use .block() |
Controlled data consumption | Use 4-arg subscribe() with backpressure |
⏱️ elapsed()
— What It Does
The .elapsed()
operator measures the time interval between when a value is requested and when it is emitted. It transforms each item into a Tuple2<Long, T>
:
Long
— time in milliseconds since the previous item was emitted (or since subscription for the first item).T
— the original value.
๐ Use Case
Use .elapsed()
to:
- Debug performance or latency issues
- Profile slow publishers
- Measure time gaps between emissions
- Observe delays in streams
Flux.just("A", "B", "C") .delayElements(Duration.ofMillis(300)) .elapsed() .subscribe(tuple -> System.out.println("Elapsed: " + tuple.getT1() + " ms, Value: " + tuple.getT2()));
⛏️ Breakdown of .elapsed()
Internals:
- Under the hood, it uses
System.nanoTime()
to calculate time difference between signals. - The first item is timed from the moment of subscription.
- Works with Flux or Mono (single value).
๐ Real-World Example
Measure API call time:
Mono.fromCallable(() -> makeNetworkCall()) .elapsed() .subscribe(tuple -> { System.out.println("API call took: " + tuple.getT1() + " ms, Response: " + tuple.getT2()); });
Caveats
Limitation | Explanation |
---|---|
Time is per item | Measures between item emissions, not total processing time. |
Doesn't work well without delays | If items are emitted too fast (e.g. Flux.range(1, 5) ), elapsed will be 0 ms for all. |
Can add minor overhead | Should not be used in high-performance paths unless needed. |
❓ Why do we need doOnNext
, doOnError
, doOnComplete
if we already have subscribe()
?
๐ค Quick Answer:
Because subscribe()
is the terminal operation — it starts the reactive pipeline.
But the doOn...
methods are non-terminal side-effect hooks used for logging, metrics, debugging, auditing, tracing, etc., without changing the stream.
๐ง Conceptual Difference
Feature | doOn... (e.g., doOnNext ) |
subscribe(...) |
---|---|---|
Purpose | Add side effects while stream flows | Consume the stream and trigger execution |
Type | Non-terminal (intermediate operator) | Terminal operator |
Returns | Still a Mono or Flux |
Returns a Disposable |
Use case | Logging, tracing, metrics, hooks | Actual consumption of data |
✅ Example: Difference in use
Flux.just("A", "B", "C") .doOnNext(item -> System.out.println("doOnNext: " + item)) // side effect .doOnComplete(() -> System.out.println("doOnComplete")) // side effect .subscribe(item -> System.out.println("subscribe: " + item)); // data consumption
Output:
doOnNext: A subscribe: A doOnNext: B subscribe: B doOnNext: C subscribe: C doOnComplete
Common doOn...
Methods and Use Cases
Method | Triggered When... | Typical Use |
---|---|---|
doOnSubscribe() |
Subscription begins | Log subscription, add tracing context |
doOnNext() |
Item is emitted | Logging, debug, metrics |
doOnComplete() |
All items successfully emitted | Notify, log, clean-up |
doOnError() |
Error is emitted | Log error, metrics, retry logic hooks |
doOnCancel() |
Subscription is cancelled | Clean-up, audit |
doOnTerminate() |
Stream terminates (complete or error) | Unified logging or cleanup |
doFinally() |
Always called last (complete/error/cancel) | Final clean-up, like finally in Java |
❗ Why not just do everything in subscribe(...)
?
.subscribe( data -> { log.debug("Got item {}", data); sendToDB(data); }, error -> { log.error("Error", error); notifyMonitoring(); }, () -> log.info("Done") );
✅ You can — but it mixes side-effects with business logic
❌ Harder to test and reason about
✅ Better to use doOnNext()
for logging and keep subscribe()
clean and purpose-focused.
✅ Best Practice: Use doOn...
for Observability, subscribe()
for Execution
This separation makes your pipelines:
- Easier to read
- More composable and testable
- Cleaner for production instrumentation (e.g., Micrometer, tracing)
๐งช Advanced Tip: You can chain multiple doOnNext()
calls!
flux .doOnNext(item -> log.debug("Stage 1: {}", item)) .map(this::transform) .doOnNext(item -> log.debug("Stage 2: {}", item))
Each one sees the stream at that point in the chain, making them perfect for stage-level inspection
✅ Rule of Thumb:
Task | Use This |
---|---|
Logging | doOnNext , doOnError |
Metrics / Tracing | doOnNext , doOnComplete |
Side effects (non-intrusive) | doOn... methods |
Business logic / API calls | map , flatMap , concatMap , switchMap |
Terminal consumer | subscribe() |
๐ Example with both logging and API call:
Flux<String> ids = Flux.just("X", "Y", "Z"); ids.doOnNext(id -> log.debug("Processing ID: {}", id)) // logging side effect .flatMap(id -> callApi(id)) // API call .map(response -> transform(response)) // transformation .doOnNext(result -> log.debug("Transformed: {}", result)) // log result .subscribe(finalResult -> sendToNextService(finalResult));
๐ง 1. contextCapture()
— Bridge for context propagation
๐ What it does:
It captures the Reactor context at the point it's called and ensures it's available downstream, especially across thread boundaries.
๐ฆ Use Case:
You've added some metadata (like requestId
) in the Context
, and want to ensure it's available inside doOnNext
, flatMap
, etc.
All Together
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.context.Context; import java.time.Duration; public class ReactiveProductionExample { public static void main(String[] args) { Flux.defer(() -> Flux.just("request-payload","request-payload1","request-payload2")) // 1️⃣ SubscribeOn sets the thread where subscription (upstream) starts .subscribeOn(Schedulers.boundedElastic()) // 2️⃣ Setup an artificial delay or API call with timeout .flatMap(data -> simulateExternalApi(data) .elapsed() // 3️⃣ elapse - for measuring latency .doOnNext(tuple -> System.out.println("API Time taken: " + tuple.getT1() + "ms")) .map(tuple -> tuple.getT2()) // Extract the actual data .timeout(Duration.ofMillis(500)) // 4️⃣ timeout protection .onErrorResume(error -> fallback(data, error)) // 5️⃣ graceful fallback ) // 6️⃣ PublishOn switches downstream flow to new thread (e.g., CPU intensive) .publishOn(Schedulers.parallel()) // 7️⃣ doOn methods for side effects — logging, metrics (not modifying flow) .doOnNext(result -> System.out.println("[Result Ready]: " + result)) .doOnError(err -> System.err.println("[ERROR]: " + err.getMessage())) .doOnSubscribe(subscription -> System.out.println("[Subscribed]")) .doOnComplete(() -> System.out.println("[Success]")) // 8️⃣ contextWrite for logging/tracing IDs .contextWrite(Context.of("requestId", "REQ-123")) // 9️⃣ Finally subscribe to trigger the chain .subscribe( data -> System.out.println("Final data: " + data), error -> System.err.println("Terminal Error: " + error), () -> System.out.println("Done") ); // Sleep to allow async threads to complete try { Thread.sleep(2000); } catch (Exception e) {} } static Mono<String> simulateExternalApi(String data) { return Mono.fromCallable(() -> { Thread.sleep(300); // Simulate delay return "response-for-" + data; }).subscribeOn(Schedulers.boundedElastic()); } static Mono<String> fallback(String input, Throwable error) { System.err.println("[Fallback triggered for: " + input + "]"); return Mono.just("fallback-for-" + input); } }
Output:
[Subscribed] API Time taken: 303ms [Result Ready]: response-for-request-payload Final data: response-for-request-payload API Time taken: 303ms API Time taken: 305ms [Result Ready]: response-for-request-payload1 Final data: response-for-request-payload1 [Result Ready]: response-for-request-payload2 Final data: response-for-request-payload2 [Success] Done
Comments
Post a Comment