Java Reactive Flow, which is part of the Reactive Streams specification — the foundation of modern reactive libraries like RxJava, Project Reactor, and Akka Streams.
🌊 What is Java Reactive Flow?
✅ It’s a standard API introduced in Java 9 under:
java.util.concurrent.Flow
It defines a minimal and interoperable Reactive Streams system for asynchronous, non-blocking data streams with backpressure.
📦 Interfaces in java.util.concurrent.Flow
:
public class Flow {
public interface Publisher<T> { void subscribe(Subscriber<? super T> s); }
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T item);
void onError(Throwable t);
void onComplete();
}
public interface Subscription { void request(long n); void cancel(); }
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
}
Let’s break them down 👇
🧩 Core Components
Component | Role |
---|---|
Publisher | Emits data (e.g. sensor, API, file read) |
Subscriber | Receives data |
Subscription | Controls the flow: request(n) is how backpressure works |
Processor | Both a subscriber and a publisher (useful for data transformation in between) |
🔁 Example Lifecycle:
- Publisher calls
subscribe()
and gives the Subscriber a Subscription. - Subscriber uses
subscription.request(n)
to say, “I’m ready for n items.” - Publisher sends
onNext()
events up ton
times. - If done → calls
onComplete()
If error → callsonError(Throwable)
✅ Minimal Example:
import java.util.concurrent.Flow.*;
public class MyReactiveFlow {
public static void main(String[] args) {
Publisher<Integer> publisher = subscriber -> {
subscriber.onSubscribe(new Subscription() {
int count = 0;
boolean canceled = false;
@Override
public void request(long n) {
for (int i = 0; i < n && !canceled; i++) {
subscriber.onNext(count++);
}
subscriber.onComplete();
}
@Override
public void cancel() {
canceled = true;
}
});
};
Subscriber<Integer> subscriber = new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("Subscribed");
s.request(5); // backpressure: I want 5 items
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable t) {
System.out.println("Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
publisher.subscribe(subscriber);
}
}
💡 Why Does This Matter?
- Interoperability — Libraries like RxJava, Reactor, Akka Streams implement this spec.
- Backpressure support — Ensures producers don’t overwhelm slow consumers.
- Standards-based — Java developers can now write reactive systems with a shared model.
🛠 Libraries That Implement It:
Library | Notes |
---|---|
RxJava 2/3 | Has Flowable , Publisher , and adapters |
Project Reactor | Flux , Mono extend Publisher |
Spring WebFlux | Built entirely on top of Reactive Streams |
Akka Streams | Fully compliant with the spec |