🎯 In Project Reactor, operators are the core tools you use to transform, filter, merge, delay, or otherwise manipulate data as it flows through a reactive pipeline.
Think of operators as the “verbs” in your reactive code — each one defines how the data should be handled.
⚙️ What is an Operator in Project Reactor?
An operator is a method that you chain on a Flux
or Mono
to define how data should be:
- Transformed
- Filtered
- Delayed
- Combined
- Scheduled
- Handled on error or completion
All operators are lazy — they don’t execute until the Flux
or Mono
is subscribed to.
🔧 Categories of Common Operators
Let’s break them down with examples:
🔁 Transformation Operators
Operator | Purpose | Example |
---|---|---|
map | Transform each element | flux.map(i -> i * 2) |
flatMap | Async transform + flatten | flux.flatMap(this::callService) |
concatMap | Ordered version of flatMap | flux.concatMap(...) |
switchMap | Switch to new inner stream | flux.switchMap(...) |
🧹 Filtering Operators
Operator | Purpose | Example |
---|---|---|
filter | Only pass values that match a condition | flux.filter(i -> i % 2 == 0) |
distinct | Remove duplicates | flux.distinct() |
take(n) | Take only first n elements | flux.take(3) |
skip(n) | Skip first n elements | flux.skip(2) |
⏱ Timing Operators
Operator | Purpose | Example |
---|---|---|
delayElements | Delay each item | flux.delayElements(Duration.ofSeconds(1)) |
timeout | Timeout on slow items | flux.timeout(Duration.ofSeconds(2)) |
interval | Emit items over time | Flux.interval(Duration.ofSeconds(1)) |
🔗 Combining Operators
Operator | Purpose | Example |
---|---|---|
zip | Combine multiple sources element-wise | Flux.zip(f1, f2) |
merge | Merge sources concurrently | Flux.merge(f1, f2) |
concat | Append one after the other | Flux.concat(f1, f2) |
combineLatest | Combine latest values from sources | Flux.combineLatest(...) |
🧯 Error Handling Operators
Operator | Purpose | Example |
---|---|---|
onErrorReturn | Return fallback value | flux.onErrorReturn("fallback") |
onErrorResume | Switch to another Publisher | flux.onErrorResume(e -> Flux.just("safe")) |
retry | Retry on error | flux.retry(3) |
🧼 Lifecycle Hooks
Operator | Purpose | Example |
---|---|---|
doOnNext | Side-effect per item | flux.doOnNext(System.out::println) |
doOnComplete | On successful completion | flux.doOnComplete(...) |
doFinally | On any termination | flux.doFinally(...) |
doOnError | On error | flux.doOnError(...) |
🧵 Threading (Schedulers)
Operator | Purpose | Example |
---|---|---|
subscribeOn | Control where upstream runs | .subscribeOn(Schedulers.boundedElastic()) |
publishOn | Control where downstream runs | .publishOn(Schedulers.parallel()) |
🧪 Quick Example with Operators:
Flux.range(1, 5)
.filter(i -> i % 2 == 0) // Keep only even numbers
.map(i -> i * 10) // Multiply each by 10
.doOnNext(i -> System.out.println("Mapped: " + i))
.subscribe(System.out::println);
✅ Output:
Mapped: 20
20
Mapped: 40
40
🧠 Pro Tips
- Operators are lazy — they don’t execute until you
.subscribe()
. - You can chain operators fluently.
- Use
.checkpoint()
to debug operator chains. - Be mindful of operator order — it affects performance and behavior.