Java.Multithreading.What is java reactive flow ?

Java Reactive Flow, which is part of the Reactive Streams specification — the foundation of modern reactive libraries like RxJava, Project Reactor, and Akka Streams.


🌊 What is Java Reactive Flow?

✅ It’s a standard API introduced in Java 9 under:

java.util.concurrent.Flow

It defines a minimal and interoperable Reactive Streams system for asynchronous, non-blocking data streams with backpressure.

📦 Interfaces in java.util.concurrent.Flow:

public class Flow {
    public interface Publisher<T> { void subscribe(Subscriber<? super T> s); }
    public interface Subscriber<T> {
        void onSubscribe(Subscription s);
        void onNext(T item);
        void onError(Throwable t);
        void onComplete();
    }
    public interface Subscription { void request(long n); void cancel(); }
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
}

Let’s break them down 👇


🧩 Core Components

ComponentRole
PublisherEmits data (e.g. sensor, API, file read)
SubscriberReceives data
SubscriptionControls the flow: request(n) is how backpressure works
ProcessorBoth a subscriber and a publisher (useful for data transformation in between)

🔁 Example Lifecycle:

  1. Publisher calls subscribe() and gives the Subscriber a Subscription.
  2. Subscriber uses subscription.request(n) to say, “I’m ready for n items.”
  3. Publisher sends onNext() events up to n times.
  4. If done → calls onComplete()
    If error → calls onError(Throwable)

✅ Minimal Example:

import java.util.concurrent.Flow.*;

public class MyReactiveFlow {
    public static void main(String[] args) {
        Publisher<Integer> publisher = subscriber -> {
            subscriber.onSubscribe(new Subscription() {
                int count = 0;
                boolean canceled = false;

                @Override
                public void request(long n) {
                    for (int i = 0; i < n && !canceled; i++) {
                        subscriber.onNext(count++);
                    }
                    subscriber.onComplete();
                }

                @Override
                public void cancel() {
                    canceled = true;
                }
            });
        };

        Subscriber<Integer> subscriber = new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("Subscribed");
                s.request(5); // backpressure: I want 5 items
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("Received: " + item);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Error: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Completed");
            }
        };

        publisher.subscribe(subscriber);
    }
}

💡 Why Does This Matter?

  • Interoperability — Libraries like RxJava, Reactor, Akka Streams implement this spec.
  • Backpressure support — Ensures producers don’t overwhelm slow consumers.
  • Standards-based — Java developers can now write reactive systems with a shared model.

🛠 Libraries That Implement It:

LibraryNotes
RxJava 2/3Has Flowable, Publisher, and adapters
Project ReactorFlux, Mono extend Publisher
Spring WebFluxBuilt entirely on top of Reactive Streams
Akka StreamsFully compliant with the spec
This entry was posted in Без рубрики. Bookmark the permalink.