Skip to main content

Project Reactor Important Methods Cheat Sheet

 

๐Ÿ”น 1️⃣ subscribeOn – "Decides WHERE the Pipeline Starts"

๐Ÿ“ Definition:

  • subscribeOn influences the thread where the data source (upstream) (e.g., data generation, API calls) runs.

  • It affects the source and everything downstream (until a publishOn switches it).


Flux<Integer> flux = Flux.range(1, 3)
    .doOnNext(i -> System.out.println("[Generating] " + i + " on " + Thread.currentThread().getName()))
    .subscribeOn(Schedulers.boundedElastic()) // Change starting thread
    .map(i -> {
        System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName());
        return i * 10;
    });

flux.blockLast();


Output 

[Generating] 1 on boundedElastic-1
[Processing] 1 on boundedElastic-1
[Generating] 2 on boundedElastic-1
[Processing] 2 on boundedElastic-1
[Generating] 3 on boundedElastic-1
[Processing] 3 on boundedElastic-1



๐Ÿ“ข Key Insight:

  • subscribeOn affects the beginning (source) and flows down.

  • subscribeOn doesn’t just switch one stage; it affects everything downstream unless overridden by publishOn.




2️⃣ publishOn – "Switches the Thread at a Certain Point"

๐Ÿ“ Definition:

  • publishOn switches the thread context at the point where it appears.

  • It affects only downstream operators.



Flux<Integer> flux = Flux.range(1, 3)
    .doOnNext(i -> System.out.println("[Generating] " + i + " on " + Thread.currentThread().getName()))
    .publishOn(Schedulers.parallel()) // Switch thread here
    .map(i -> {
        System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName());
        return i * 10;
    });

flux.blockLast();






[Generating] 1 on main
[Generating] 2 on main
[Generating] 3 on main
[Processing] 1 on parallel-1
[Processing] 2 on parallel-1
[Processing] 3 on parallel-1


Key Insight:

  • publishOn switches thread only from where it’s placed.

  • Upstream (before publishOn) stays on original thread (e.g., main).




Combining Both: Full Example






Flux<Integer> flux = Flux.range(1, 3)
    .doOnNext(i -> System.out.println("[Source] " + i + " on " + Thread.currentThread().getName()))
    .subscribeOn(Schedulers.boundedElastic()) // Run source on boundedElastic
    .publishOn(Schedulers.parallel()) // Switch context for downstream
    .map(i -> {
        System.out.println("[Processing] " + i + " on " + Thread.currentThread().getName());
        return i * 10;
    })
    .publishOn(Schedulers.single()) // Switch context again
    .doOnNext(i -> System.out.println("[Final] " + i + " on " + Thread.currentThread().getName()));

flux.blockLast();


[Source] 1 on boundedElastic-1
[Source] 2 on boundedElastic-1
[Source] 3 on boundedElastic-1
[Processing] 1 on parallel-1
[Processing] 2 on parallel-1
[Processing] 3 on parallel-1
[Final] 10 on single-1
[Final] 20 on single-1
[Final] 30 on single-1

Feature subscribeOn publishOn
Thread Affected Affects source and downstream Affects only downstream from where used
Usage To control source/initial thread To switch context mid-pipeline
Common Use Case I/O sources (DB, network calls) Switching from CPU-bound to I/O or vice versa
Placement Impact Works wherever it is, but affects source Works where it is, affects downstream





------------------------------------------End-------------------------------------


๐Ÿš€ 1️⃣ What is defer in Reactor?

๐Ÿ“Œ Definition:

  • Mono.defer or Flux.defer delays the creation of the actual Publisher (Mono/Flux) until a subscriber subscribes.

  • Every subscriber gets a new instance of the Publisher, which is created lazily.

๐Ÿ”‘ This is especially useful when you:

  • Want to wrap a method call (like DB access, event publishing, etc.).

  • Ensure the call happens only when subscribed (and not during definition of the pipeline).

  • Ensure threading behavior (as subscribeOn affects from the point of subscription).



2️⃣ Example: Without defer

Let’s say we call a DB method that returns a Mono.


public Mono<String> getDataFromDb() {

    System.out.println("Calling DB on thread: " + Thread.currentThread().getName());

    return Mono.just("Data");

}


Mono<String> result = getDataFromDb()

    .subscribeOn(Schedulers.boundedElastic())

    .doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName()));


result.subscribe();


Calling DB on thread: main
Got: Data on boundedElastic-1


Observation:

  • getDataFromDb() is called immediately, when defining result.

  • Even though subscribeOn is used, the method is called on main thread.



๐Ÿ”ฅ 3️⃣ Example: With defer


Mono<String> result = Mono.defer(() -> {
    System.out.println("Calling DB on thread: " + Thread.currentThread().getName());
    return Mono.just("Data");
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> System.out.println("Got: " + data + " on " + Thread.currentThread().getName()));

result.subscribe();


Calling DB on thread: boundedElastic-1
Got: Data on boundedElastic-1



Observation:

  • Now, defer delays the DB call until subscription.

  • Because subscribeOn controls the thread where subscription happens, the DB call is made on the boundedElastic thread.



4️⃣ defer + publishOn – What Happens?


Mono<String> result = Mono.defer(() -> {
    System.out.println("Calling DB on thread: " + Thread.currentThread().getName());
    return Mono.just("Data");
})
.subscribeOn(Schedulers.boundedElastic()) // Controls where subscription & upstream happen
.publishOn(Schedulers.parallel())         // Switch thread for downstream
.map(data -> {
    System.out.println("Processing on thread: " + Thread.currentThread().getName());
    return data.toUpperCase();
})
.doOnNext(data -> System.out.println("Final: " + data + " on " + Thread.currentThread().getName()));

result.subscribe();



Calling DB on thread: boundedElastic-1
Processing on thread: parallel-1
Final: DATA on parallel-1




Observation:

  • defer ensures DB call is on boundedElastic (because of subscribeOn).

  • publishOn switches thread after DB call (for downstream processing)




Concept Meaning
defer Wraps a supplier so it’s called lazily at subscription time, not pipeline definition.
subscribeOn Controls thread of subscription & upstream (including defer calls).
publishOn Switches thread from that point downstream.




---------------------------------End--------------------------------


๐Ÿš€ Schedulers in Reactor: Deep Dive

Schedulers are the backbone of threading in Reactor. They control where and how your reactive pipeline’s tasks run.

Let’s start by listing common Reactor Schedulers:

  • Schedulers.immediate()

  • Schedulers.single()

  • Schedulers.parallel()

  • Schedulers.boundedElastic()

  • Schedulers.newSingle(name)

  • Schedulers.fromExecutorService(executor)


๐ŸŽฏ 1️⃣ Schedulers.immediate()

๐Ÿ”‘ Definition:
Runs tasks on the current thread (no thread switch).

๐Ÿ” Use Case:

  • When you don’t want threading overhead (for example, when already on a background thread).

  • For debugging or very lightweight tasks.





Mono.just("data")
    .subscribeOn(Schedulers.immediate())
    .doOnNext(d -> System.out.println("Immediate: " + Thread.currentThread().getName()))
    .subscribe();

Output 

Immediate: main


๐ŸŽฏ 2️⃣ Schedulers.single()

๐Ÿ”‘ Definition:
A single-threaded scheduler backed by one thread (shared across all usages).

๐Ÿ” Use Case:

  • Sequential task processing (e.g., writing logs, file writes, or certain transactional operations).

  • Avoids concurrency issues.

Example 

Mono.just("single-threaded")
    .subscribeOn(Schedulers.single())
    .doOnNext(d -> System.out.println("Single: " + Thread.currentThread().getName()))
    .subscribe();

Single: reactor-single-1


๐ŸŽฏ 3️⃣ Schedulers.parallel()

๐Ÿ”‘ Definition:
A fixed-size thread pool, typically based on the number of CPU cores (Runtime.getRuntime().availableProcessors()).

๐Ÿ” Use Case:

  • CPU-bound tasks (e.g., intensive calculations, transformation logic).

  • Leverage multiple cores for parallelism.


Flux.range(1, 5)
    .parallel()  // Makes parallel flux
    .runOn(Schedulers.parallel())
    .doOnNext(i -> System.out.println("Parallel: " + i + " on " + Thread.currentThread().getName()))
    .sequential()
    .subscribe();


Output
Parallel: 1 on parallel-1
Parallel: 2 on parallel-2
Parallel: 3 on parallel-1
Parallel: 4 on parallel-2
Parallel: 5 on parallel-1



๐ŸŽฏ 4️⃣ Schedulers.boundedElastic()

๐Ÿ”‘ Definition:
An elastic thread pool with limits. Creates threads as needed (like newCachedThreadPool) but bounded to avoid resource exhaustion.

๐Ÿ” Use Case:

  • I/O-bound operations:

    • Blocking DB calls (JDBC)

    • File reads/writes

    • Calling external APIs

  • When the number of concurrent threads could be large but controlled.



Mono.fromCallable(() -> {
    Thread.sleep(1000); // Simulate blocking call
    return "Elastic data";
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(d -> System.out.println("Elastic: " + d + " on " + Thread.currentThread().getName()))
.subscribe();


Elastic: Elastic data on boundedElastic-1



๐ŸŽฏ 5️⃣ Schedulers.newSingle("custom")

๐Ÿ”‘ Definition:
Creates a new single-threaded Scheduler with a custom thread name.

๐Ÿ” Use Case:

  • When you need an isolated thread for a specific purpose (e.g., a dedicated logger or a sensitive task).



Scheduler myScheduler = Schedulers.newSingle("my-thread");

Mono.just("custom")
    .subscribeOn(myScheduler)
    .doOnNext(d -> System.out.println("Custom: " + d + " on " + Thread.currentThread().getName()))
    .subscribe();

Custom: custom on my-thread-1


๐ŸŽฏ 6️⃣ Schedulers.fromExecutorService()

๐Ÿ”‘ Definition:
Wraps a custom ExecutorService (e.g., a Java thread pool) into a Scheduler.

๐Ÿ” Use Case:

  • When integrating with an existing thread pool (from legacy systems or custom configurations).

๐Ÿ’ป Example:


ExecutorService executor = Executors.newFixedThreadPool(4);

Scheduler customScheduler = Schedulers.fromExecutorService(executor);


Mono.just("custom-executor")

    .subscribeOn(customScheduler)

    .doOnNext(d -> System.out.println("Executor: " + d + " on " + Thread.currentThread().getName()))

    .subscribe();



Scheduler Threads Use Case
immediate Current thread No threading
single 1 shared thread Sequential tasks
parallel CPU cores CPU-bound parallel tasks
boundedElastic Dynamic, bounded pool I/O-bound, blocking tasks
newSingle("name") 1 new thread Isolated, named thread
fromExecutorService Custom pool Integrate legacy thread pools


------------------------End----------------------------


๐Ÿ”ฅ map vs flatMap in Reactor: Detailed Breakdown




1️⃣ map()

๐Ÿ“š What is it?

  • Synchronous transformation of data.

  • It maps (transforms) one value to another in a one-to-one fashion.

  • Input type → Output type transformation.

๐Ÿ”ฅ Key Points:

  • Takes a Function<T, R>: input type T to output type R.

  • Runs in the same thread where it’s called (no async switch).

  • Great for simple data transformations (e.g., uppercasing a string, adding a number).


Mono<String> mono = Mono.just("hello");

mono.map(s -> s.toUpperCase())  // Transform "hello" → "HELLO"
    .subscribe(result -> System.out.println("Result: " + result));


Output 
Result: HELLO



๐ŸŽฏ 2️⃣ flatMap()

๐Ÿ“š What is it?

  • Asynchronous or nested transformation.

  • It maps a value to a Publisher (Mono or Flux), and then flattens the result.

  • Useful for chaining async calls (like DB access, web calls).

๐Ÿ”ฅ Key Points:

  • Takes a Function<T, Publisher<R>>.

  • It flattens nested streams into a single stream.

  • Often used for asynchronous workflows where each step depends on the result of the previous one.

๐Ÿ’ป Example:

Mono<String> mono = Mono.just("user123");

// Simulate async DB call
mono.flatMap(id -> Mono.just("User Name for " + id))
    .subscribe(result -> System.out.println("Result: " + result));


Output 

Result: User Name for user123


Flattening Example (No Async, No Threading)

Here’s a basic flattening example:


Flux<String> source = Flux.just("A", "B", "C");


// flatMap each letter to a Flux of two letters (like "A" → ["A1", "A2"])

Flux<String> flattened = source.flatMap(letter -> Flux.just(letter + "1", letter + "2"));


flattened.subscribe(System.out::println);



What does "flattening" mean in flatMap?
When you call flatMap, it maps each item to a new Publisher (Mono/Flux), and then flattens those inner Publishers into the main stream.

Imagine:

You start with Mono<T> or Flux<T>.

Each T maps to a Publisher<R> (Mono or Flux).

Instead of ending up with nested Publishers (Mono<Mono<R>> or Flux<Flux<R>>), Reactor "flattens" them into Mono<R> or Flux<R>.


Flux<T>  --flatMap--> Flux<Publisher<R>>  ---flatten---> Flux<R>
Mono<T>  --flatMap--> Mono<Publisher<R>>  ---flatten---> Mono<R>


Async Example Without Explicit subscribeOn

Now, what if the inner Publisher itself is asynchronous (e.g., network call, DB call, timer)? Even if you don’t specify subscribeOn, the inner Publisher’s behavior can introduce asynchrony.



Flux<String> source = Flux.just("A", "B", "C");


// Simulate async operation using delayElements

Flux<String> flattened = source.flatMap(letter ->

    Flux.just(letter + "1", letter + "2").delayElements(Duration.ofMillis(100))

);


flattened.subscribe(result -> 

    System.out.println("Received: " + result + " on " + Thread.currentThread().getName())

);



What’s Happening?

  • The outer Flux is processed synchronously.

  • The inner Flux has delayElements, which schedules its elements on the default timer scheduler in Reactor (async).

  • This introduces asynchrony, even though we didn’t call subscribeOn.


Received: A1 on reactor-timer_1
Received: A2 on reactor-timer_1
Received: B1 on reactor-timer_1
Received: B2 on reactor-timer_1
Received: C1 on reactor-timer_1
Received: C2 on reactor-timer_1


 Why Async Without Explicit Threading?

Reactor supports asynchronous operators (like delayElements, fromFuture, etc.) that internally use Schedulers.

✅ Even if you don’t specify subscribeOn, these operators introduce scheduling (e.g., Reactor’s timer scheduler for delayElements).

✅ So, flatMap’s inner Publishers can be async if the operation inside is async.




Feature map flatMap
Sync/Async Synchronous Asynchronous or nested
Transformation One-to-one One-to-many (flattens inner Publisher)
Threading Same thread Depends on inner publisher's thread
Use case Simple transformation Chaining async/complex calls




----------------------------_----End---------------------------



1️⃣ What is timeout in Reactor?

timeout is an operator used in Project Reactor (Mono/Flux) to specify the maximum time you’re willing to wait for an item from the upstream Publisher.

  • If the upstream doesn’t emit within the specified duration, it terminates the sequence with a TimeoutException (or can optionally switch to a fallback sequence).

๐Ÿ“š Analogy:

Imagine you’re waiting for a pizza delivery. You say:
“I’ll wait 30 minutes. If it’s not here by then, I’ll order from a different place.”


2️⃣ Basic Usage of timeout

Example 1: Simple Timeout



Flux<String> slowFlux = Flux.just("A", "B", "C")
    .delayElements(Duration.ofSeconds(2));

slowFlux
    .timeout(Duration.ofSeconds(1))  // If any element takes >1s, error
    .subscribe(
        item -> System.out.println("Received: " + item),
        err -> System.err.println("Error: " + err)
    );


Explanation:

  • delayElements(2s) delays each item by 2 seconds.

  • timeout(1s) expects each element within 1 second.

  • If any item is delayed beyond 1s, it triggers TimeoutException.


3️⃣ Timeout With Fallback

Instead of terminating with an error, we can switch to a fallback:



slowFlux

    .timeout(Duration.ofSeconds(1), Flux.just("Fallback", "Default"))

    .subscribe(

        item -> System.out.println("Received: " + item),

        err -> System.err.println("Error: " + err)

    );




๐Ÿ”Ž Now, if a timeout occurs:

  • Instead of error, the stream switches to Flux.just("Fallback", "Default").


 How Does It Work Under the Hood?

  • timeout internally sets up a timer (Reactor uses a Scheduler, usually parallel or timer).

  • If an item does not arrive in time, it:

    • Either emits TimeoutException (default behavior).

    • Or cancels the upstream and switches to a fallback Publisher (if provided).

๐Ÿ“Œ Threading: The timeout timer runs on a separate scheduler (usually timer or parallel), not blocking the main thread.


Operator Description
timeout(Duration) Waits max Duration for next item, errors on timeout.
timeout(Duration, Publisher fallback) On timeout, switch to fallback Publisher instead of error.
timeout(Publisher firstTimeout) Timeout is determined by emissions from firstTimeout.
timeout(Publisher first, Function) Dynamically control timeouts per item.


Example: Dynamic Timeout Per Item


Flux<String> items = Flux.just("A", "B", "C");


// Each item has a custom timeout: "A" gets 1s, "B" gets 2s, etc.

items

    .timeout(

        item -> {

            if (item.equals("A")) return Mono.delay(Duration.ofSeconds(1));

            else return Mono.delay(Duration.ofSeconds(2));

        }

    )

    .subscribe(

        System.out::println,

        err -> System.err.println("Error: " + err)

    );

⚠️ 6️⃣ Common Pitfalls

1️⃣ Timeout Is Per Item:
Timeout applies to each item individually. So even if the stream emits items regularly, any item that takes too long triggers timeout.

2️⃣ Scheduler Overhead:
Each timeout sets up a timer. For very high-frequency streams, too many timers can cause overhead.

3️⃣ Fallback Must Be Reactive:
If you use fallback, it must be a Publisher, not a static value.



Concept Key Points
timeout Sets max wait time for each item. Emits error or switches fallback.
Fallback Optional Publisher to continue on timeout instead of error.
Dynamic timeout Custom timeout per item using Function.
Threading Uses internal Scheduler (e.g., timer) for managing timeout logic.
Use Cases DB queries, WebClient calls, external systems with variable response times.


๐Ÿ”ฅ Key onError Methods:


Method Purpose Example
onErrorReturn(T fallback) Return a fallback value when an error occurs .onErrorReturn("Fallback value")
onErrorResume(Function) Switch to a fallback Publisher (Mono/Flux) when an error occurs .onErrorResume(e -> Flux.just("Fallback"))
onErrorMap(Function) Transform the error into another error (e.g., wrapping) .onErrorMap(e -> new CustomException(e))
onErrorContinue(BiConsumer) Log or skip the error and continue with the next item .onErrorContinue((e, item) -> log.warn(...))
doOnError(Consumer) Side-effect: perform an action on error but don’t recover .doOnError(e -> log.error("Error: " + e))



⚠️ onErrorReturn vs onErrorResume

  • onErrorReturn: emits one fallback value and terminates.

  • onErrorResume: switches to a new sequence (Flux/Mono) and continues.



-------------------------------------End---------------------



2️⃣ subscribe() Overloads in Reactor



๐Ÿ” subscribe() in Project Reactor

In Reactor, nothing happens until you call subscribe(). It is a terminal operation that starts the data flow in the reactive pipeline (aka subscription to the Publisher).




subscribe();

subscribe(Consumer<? super T> consumer);

subscribe(Consumer<? super T> consumer,

          Consumer<? super Throwable> errorConsumer);

subscribe(Consumer<? super T> consumer,

          Consumer<? super Throwable> errorConsumer,

          Runnable completeConsumer);

subscribe(Consumer<? super T> consumer,

          Consumer<? super Throwable> errorConsumer,

          Runnable completeConsumer,

          Consumer<? super Subscription> subscriptionConsumer);


✅ 1. subscribe()

  • Use Case: Fire-and-forget (e.g., logging, debugging, testing).

  • No way to observe emitted values or errors.




Flux.just("A", "B", "C")
    .map(String::toLowerCase)
    .subscribe();



✅ 2. subscribe(Consumer<T> onNext)

  • Use Case: You care about the data but not errors or completion.

  • Best for simple flows like logging results.

Mono.just("Hello")
    .subscribe(data -> System.out.println("Received: " + data));


✅ 3. subscribe(onNext, onError)

  • Use Case: Want to handle data and errors.

  • Recommended for robust code.



Mono.just("A")
    .map(s -> { throw new RuntimeException("Boom!"); })
    .subscribe(
        data -> System.out.println("Received: " + data),
        error -> System.err.println("Error: " + error.getMessage())
    );


✅ 4. subscribe(onNext, onError, onComplete)

  • Use Case: You want to know when the stream completes successfully.



Flux.range(1, 3)
    .subscribe(
        data -> System.out.println("Got: " + data),
        error -> System.err.println("Error: " + error),
        () -> System.out.println("Stream completed.")
    );




✅ 5. subscribe(onNext, onError, onComplete, onSubscribe)

  • Use Case: Full control — including backpressure (you can control how many items to request).

  • The onSubscribe gives access to the Subscription.







Flux.range(1, 10)
    .subscribe(
        data -> System.out.println("Item: " + data),
        err -> System.err.println("Error: " + err),
        () -> System.out.println("Done"),
        subscription -> {
            System.out.println("Subscribed!");
            subscription.request(5); // Request only 5 items
        }
    );



๐Ÿ› ️ When to Use What?


Scenario Recommended subscribe Variant
You don't care about results subscribe()
You want only data subscribe(onNext)
You want to handle data + errors subscribe(onNext, onError)
You want to handle completion subscribe(onNext, onError, onComplete)
You want fine control (e.g., backpressure) subscribe(onNext, onError, onComplete, onSubscribe)






๐Ÿ” block() vs subscribe()


Feature subscribe() block()
Type Non-blocking Blocking
Use in Reactive chains (preferred) Testing, bridging to sync
Thread Doesn’t block calling thread Blocks current thread






๐Ÿ“ฆ Real-World Use Cases



Use Case Approach
Logging events subscribe(event -> log.info("Event: {}", event))
REST controller (WebFlux) Return Mono/Flux, no need to call subscribe() manually
Side-effects (e.g., DB insert) subscribe() with proper error/complete handling
Integration test (wait result) Use .block()
Controlled data consumption Use 4-arg subscribe() with backpressure







---------------------------------End-----------------


⏱️ elapsed() — What It Does

Flux<T> → Flux<Tuple2<Long, T>>

The .elapsed() operator measures the time interval between when a value is requested and when it is emitted. It transforms each item into a Tuple2<Long, T>:

  • Long — time in milliseconds since the previous item was emitted (or since subscription for the first item).

  • T — the original value.


๐Ÿ“˜ Use Case

Use .elapsed() to:

  • Debug performance or latency issues

  • Profile slow publishers

  • Measure time gaps between emissions

  • Observe delays in streams

Flux.just("A", "B", "C")
    .delayElements(Duration.ofMillis(300))
    .elapsed()
    .subscribe(tuple -> System.out.println("Elapsed: " + tuple.getT1() + " ms, Value: " + tuple.getT2()));



⛏️ Breakdown of .elapsed() Internals:

  • Under the hood, it uses System.nanoTime() to calculate time difference between signals.

  • The first item is timed from the moment of subscription.

  • Works with Flux or Mono (single value).

๐Ÿ‘€ Real-World Example

Measure API call time:


Mono.fromCallable(() -> makeNetworkCall()) .elapsed() .subscribe(tuple -> { System.out.println("API call took: " + tuple.getT1() + " ms, Response: " + tuple.getT2()); });

Caveats


Limitation Explanation
Time is per item Measures between item emissions, not total processing time.
Doesn’t work well without delays If items are emitted too fast (e.g. Flux.range(1, 5)), elapsed will be 0 ms for all.
Can add minor overhead Should not be used in high-performance paths unless needed.
-------------------End----------------------------



❓ Why do we need doOnNext, doOnError,

doOnComplete if we already have subscribe()?

๐Ÿค” Quick Answer:

Because subscribe() is the terminal operation — it starts the reactive pipeline.
But the doOn... methods are non-terminal side-effect hooks used for logging, metrics,

debugging, auditing, tracing, etc., without changing the stream.


๐Ÿง  Conceptual Difference


Feature doOn... (e.g., doOnNext) subscribe(...)
Purpose Add side effects while stream flows Consume the stream and trigger execution
Type Non-terminal (intermediate operator) Terminal operator
Returns Still a Mono or Flux Returns a Disposable
Use case Logging, tracing, metrics, hooks Actual consumption of data



✅ Example: Difference in use

Flux.just("A", "B", "C") .doOnNext(item -> System.out.println("doOnNext: " + item)) // side effect .doOnComplete(() -> System.out.println("doOnComplete")) // side effect .subscribe(item -> System.out.println("subscribe: " + item)); // data consumption


Ouput
doOnNext: A subscribe: A doOnNext: B subscribe: B doOnNext: C subscribe: C doOnComplete


Common doOn... Methods and Use Cases

Method Triggered When... Typical Use
doOnSubscribe() Subscription begins Log subscription, add tracing context
doOnNext() Item is emitted Logging, debug, metrics
doOnComplete() All items successfully emitted Notify, log, clean-up
doOnError() Error is emitted Log error, metrics, retry logic hooks
doOnCancel() Subscription is cancelled Clean-up, audit
doOnTerminate() Stream terminates (complete or error) Unified logging or cleanup
doFinally() Always called last (complete/error/cancel) Final clean-up, like finally in Java



❗ Why not just do everything in subscribe(...)?

.subscribe(
    data -> { log.debug("Got item {}", data); sendToDB(data); },
    error -> { log.error("Error", error); notifyMonitoring(); },
    () -> log.info("Done")
);


✅ You can — but it mixes side-effects with business logic
❌ Harder to test and reason about
✅ Better to use doOnNext() for logging and keep subscribe() clean and purpose-focused.




✅ Best Practice: Use doOn... for Observability, subscribe() for Execution

This separation makes your pipelines:

  • Easier to read

  • More composable and testable

  • Cleaner for production instrumentation (e.g., Micrometer, tracing)


๐Ÿงช Advanced Tip: You can chain multiple doOnNext() calls!


flux
    .doOnNext(item -> log.debug("Stage 1: {}", item))
    .map(this::transform)
    .doOnNext(item -> log.debug("Stage 2: {}", item))


Each one sees the stream at that point in the chain, making them perfect for stage-level inspection


✅ Rule of Thumb:


Task Use This
Logging doOnNext, doOnError
Metrics / Tracing doOnNext, doOnComplete
Side effects (non-intrusive) doOn... methods
Business logic / API calls map, flatMap, concatMap, switchMap
Terminal consumer subscribe()


๐Ÿ” Example with both logging and API call:

Flux<String> ids = Flux.just("X", "Y", "Z");

ids.doOnNext(id -> log.debug("Processing ID: {}", id)) // logging side effect
   .flatMap(id -> callApi(id))                          // API call
   .map(response -> transform(response))                // transformation
   .doOnNext(result -> log.debug("Transformed: {}", result)) // log result
   .subscribe(finalResult -> sendToNextService(finalResult));



-----------------------------------End----------------------


๐Ÿง  1. contextCapture() — Bridge for context propagation

๐Ÿ” What it does:

It captures the Reactor context at the point it’s called and ensures it's available downstream, especially across thread boundaries.


๐Ÿ“ฆ Use Case:

You’ve added some metadata (like requestId) in the Context, and want to ensure it's available inside doOnNext, flatMap, etc.

-------------End---------------------------

All Together 





import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

import java.time.Duration;

public class ReactiveProductionExample {

public static void main(String[] args) {
Flux.defer(() -> Flux.just("request-payload","request-payload1","request-payload2"))

// 1️⃣ SubscribeOn sets the thread where subscription (upstream) starts
.subscribeOn(Schedulers.boundedElastic())

// 2️⃣ Setup an artificial delay or API call with timeout
.flatMap(data -> simulateExternalApi(data)
.elapsed() // 3️⃣ elapse - for measuring latency
.doOnNext(tuple -> System.out.println("API Time taken: " + tuple.getT1() + "ms"))
.map(tuple -> tuple.getT2()) // Extract the actual data
.timeout(Duration.ofMillis(500)) // 4️⃣ timeout protection
.onErrorResume(error -> fallback(data, error)) // 5️⃣ graceful fallback
)

// 6️⃣ PublishOn switches downstream flow to new thread (e.g., CPU intensive)
.publishOn(Schedulers.parallel())

// 7️⃣ doOn methods for side effects — logging, metrics (not modifying flow)
.doOnNext(result -> System.out.println("[Result Ready]: " + result))
.doOnError(err -> System.err.println("[ERROR]: " + err.getMessage()))
.doOnSubscribe(subscription -> System.out.println("[Subscribed]"))
// .doOnSuccess(r -> System.out.println("[Success]") )
.doOnComplete(() -> System.out.println("[Success]") )

// 8️⃣ contextWrite for logging/tracing IDs
.contextWrite(Context.of("requestId", "REQ-123"))

// 9️⃣ Finally subscribe to trigger the chain
.subscribe(
data -> System.out.println("Final data: " + data),
error -> System.err.println("Terminal Error: " + error),
() -> System.out.println("Done")
);

// Sleep to allow async threads to complete
try { Thread.sleep(2000); } catch (Exception e) {}
}

static Mono<String> simulateExternalApi(String data) {
return Mono.fromCallable(() -> {
Thread.sleep(300); // Simulate delay
return "response-for-" + data;
}).subscribeOn(Schedulers.boundedElastic());
}

static Mono<String> fallback(String input, Throwable error) {
System.err.println("[Fallback triggered for: " + input + "]");
return Mono.just("fallback-for-" + input);
}
}

[Subscribed]
API Time taken: 303ms
[Result Ready]: response-for-request-payload
Final data: response-for-request-payload
API Time taken: 303ms
API Time taken: 305ms
[Result Ready]: response-for-request-payload1
Final data: response-for-request-payload1
[Result Ready]: response-for-request-payload2
Final data: response-for-request-payload2
[Success]
Done






















Comments

Popular posts from this blog

Mastering Java Logging: A Guide to Debug, Info, Warn, and Error Levels

Comprehensive Guide to Java Logging Levels: Trace, Debug, Info, Warn, Error, and Fatal Comprehensive Guide to Java Logging Levels: Trace, Debug, Info, Warn, Error, and Fatal Logging is an essential aspect of application development and maintenance. It helps developers track application behavior and troubleshoot issues effectively. Java provides various logging levels to categorize messages based on their severity and purpose. This article covers all major logging levels: Trace , Debug , Info , Warn , Error , and Fatal , along with how these levels impact log printing. 1. Trace The Trace level is the most detailed logging level. It is typically used for granular debugging, such as tracking every method call or step in a complex computation. Use this level sparingly, as it can generate a large volume of log data. 2. Debug The Debug level provides detailed information useful during dev...

Choosing Between Envoy and NGINX Ingress Controllers for Kubernetes

As Kubernetes has become the standard for deploying containerized applications, ingress controllers play a critical role in managing how external traffic is routed to services within the cluster. Envoy and NGINX are two of the most popular options for ingress controllers, and each has its strengths, weaknesses, and ideal use cases. In this blog, we’ll explore: How both ingress controllers work. A detailed comparison of their features. When to use Envoy vs. NGINX for ingress management. What is an Ingress Controller? An ingress controller is a specialized load balancer that: Manages incoming HTTP/HTTPS traffic. Routes traffic to appropriate services based on rules defined in Kubernetes ingress resources. Provides features like TLS termination, path-based routing, and host-based routing. How Envoy Ingress Controller Works Envoy , initially built by Lyft, is a high-performance, modern service proxy and ingress solution. Here's how it operates in Kubernetes: Ingress Resource : You d...

Understanding API Parameters in Spring Boot

Understanding API Parameters in Spring Boot When designing APIs in Spring Boot, it's essential to understand how to handle different types of parameters. These parameters define how data is sent from the client to the server. Let's break down the common types of parameters used in API development, with examples and cURL commands. 1. Types of Parameters Parameter Type Location Use Case Example Format Annotation in Spring Boot Query Param URL after `?` Filtering, Pagination ?key=value @RequestParam Path Param In the URL path Identifying specific resource /resource/{id} @PathVariable Form Param Form-encoded body Simple form submissions ...