Java.Reactive.What is the role of Subscription in reactive programming?

the Subscription plays a pivotal role in managing flow control and resource management in reactive programming.

Let’s break it down 👇


🎯 What is a Subscription?

In Reactive Streams (Java 9 Flow API, RxJava, Reactor, Akka Streams), a Subscription is the link between the Publisher (data producer) and the Subscriber (data consumer).

It gives the Subscriber control over:

  • How much data to receive (request(n))
  • When to stop receiving (cancel())

Think of it like a remote control 📺:
The subscriber can press “Give me more” or “Stop” at any time.

🧩 Core Responsibilities of Subscription

MethodPurpose
request(long n)Tells the Publisher: “I’m ready for n more items.” This enables backpressure.
cancel()Stops the data flow and unsubscribes the observer. Useful for cleanup and preventing memory leaks.

🧪 Example: Manual Control

import java.util.concurrent.Flow.*;

public class CustomReactiveFlow {
    public static void main(String[] args) {
        Publisher<Integer> publisher = subscriber -> {
            subscriber.onSubscribe(new Subscription() {
                int count = 0;
                boolean cancelled = false;

                @Override
                public void request(long n) {
                    for (int i = 0; i < n && !cancelled; i++) {
                        subscriber.onNext(count++);
                    }
                    subscriber.onComplete();
                }

                @Override
                public void cancel() {
                    cancelled = true;
                    System.out.println("Subscription cancelled");
                }
            });
        };

        Subscriber<Integer> subscriber = new Subscriber<>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription s) {
                this.subscription = s;
                System.out.println("Subscribed");
                s.request(5); // ask for 5 items
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("Received: " + item);
            }

            @Override
            public void onError(Throwable t) {}

            @Override
            public void onComplete() {
                System.out.println("Done");
            }
        };

        publisher.subscribe(subscriber);
    }
}

🧠 Why is Subscription So Important?

Backpressure: Without request(n), fast publishers can overwhelm slow consumers.

Laziness: Publishers don’t start emitting until request() is called — promotes efficiency.

Cancellation: Prevents memory leaks and unnecessary computation.

Resource management: Like closing files, sockets, or unsubscribing from events.

🔗 Where You See Subscription

LibraryRole of Subscription
Java Flow APIManual request() and cancel()
RxJavaAbstracted as Disposable
Project ReactorImplicit with BaseSubscriber, or accessed via .doOnSubscribe(...)
Spring WebFluxAutomatically handled — but still honors request(n) from clients
Akka StreamsBackpressure protocol handled under the hood via demand signaling

🧠 Why is Subscription So Important?

Backpressure: Without request(n), fast publishers can overwhelm slow consumers.

Laziness: Publishers don’t start emitting until request() is called — promotes efficiency.

Cancellation: Prevents memory leaks and unnecessary computation.

Resource management: Like closing files, sockets, or unsubscribing from events.


🔗 Where You See Subscription

LibraryRole of Subscription
Java Flow APIManual request() and cancel()
RxJavaAbstracted as Disposable
Project ReactorImplicit with BaseSubscriber, or accessed via .doOnSubscribe(...)
Spring WebFluxAutomatically handled — but still honors request(n) from clients
Akka StreamsBackpressure protocol handled under the hood via demand signaling
This entry was posted in Без рубрики. Bookmark the permalink.