Java.Reative.What are the operators in Project Reactor and what are they used for?

🎯 In Project Reactor, operators are the core tools you use to transform, filter, merge, delay, or otherwise manipulate data as it flows through a reactive pipeline.

Think of operators as the “verbs” in your reactive code — each one defines how the data should be handled.


⚙️ What is an Operator in Project Reactor?

An operator is a method that you chain on a Flux or Mono to define how data should be:

  • Transformed
  • Filtered
  • Delayed
  • Combined
  • Scheduled
  • Handled on error or completion

All operators are lazy — they don’t execute until the Flux or Mono is subscribed to.

🔧 Categories of Common Operators

Let’s break them down with examples:

🔁 Transformation Operators

OperatorPurposeExample
mapTransform each elementflux.map(i -> i * 2)
flatMapAsync transform + flattenflux.flatMap(this::callService)
concatMapOrdered version of flatMapflux.concatMap(...)
switchMapSwitch to new inner streamflux.switchMap(...)

🧹 Filtering Operators

OperatorPurposeExample
filterOnly pass values that match a conditionflux.filter(i -> i % 2 == 0)
distinctRemove duplicatesflux.distinct()
take(n)Take only first n elementsflux.take(3)
skip(n)Skip first n elementsflux.skip(2)

Timing Operators

OperatorPurposeExample
delayElementsDelay each itemflux.delayElements(Duration.ofSeconds(1))
timeoutTimeout on slow itemsflux.timeout(Duration.ofSeconds(2))
intervalEmit items over timeFlux.interval(Duration.ofSeconds(1))

🔗 Combining Operators

OperatorPurposeExample
zipCombine multiple sources element-wiseFlux.zip(f1, f2)
mergeMerge sources concurrentlyFlux.merge(f1, f2)
concatAppend one after the otherFlux.concat(f1, f2)
combineLatestCombine latest values from sourcesFlux.combineLatest(...)

🧯 Error Handling Operators

OperatorPurposeExample
onErrorReturnReturn fallback valueflux.onErrorReturn("fallback")
onErrorResumeSwitch to another Publisherflux.onErrorResume(e -> Flux.just("safe"))
retryRetry on errorflux.retry(3)

🧼 Lifecycle Hooks

OperatorPurposeExample
doOnNextSide-effect per itemflux.doOnNext(System.out::println)
doOnCompleteOn successful completionflux.doOnComplete(...)
doFinallyOn any terminationflux.doFinally(...)
doOnErrorOn errorflux.doOnError(...)

🧵 Threading (Schedulers)

OperatorPurposeExample
subscribeOnControl where upstream runs.subscribeOn(Schedulers.boundedElastic())
publishOnControl where downstream runs.publishOn(Schedulers.parallel())

🧪 Quick Example with Operators:

Flux.range(1, 5)
    .filter(i -> i % 2 == 0)           // Keep only even numbers
    .map(i -> i * 10)                  // Multiply each by 10
    .doOnNext(i -> System.out.println("Mapped: " + i))
    .subscribe(System.out::println);

✅ Output:

Mapped: 20
20
Mapped: 40
40

🧠 Pro Tips

  • Operators are lazy — they don’t execute until you .subscribe().
  • You can chain operators fluently.
  • Use .checkpoint() to debug operator chains.
  • Be mindful of operator order — it affects performance and behavior.
This entry was posted in Без рубрики. Bookmark the permalink.