Java.Reactive.How to unsubscribe from a stream to prevent memory leaks?

Let’s look at how to unsubscribe from a stream in different reactive libraries to avoid memory leaks or resource hogging.


🧼 Why Unsubscribe?

Streams may emit:

  • Forever (e.g., mouse clicks, websocket events)
  • Or slow, bursty data (e.g., Kafka, sensors)

If you don’t unsubscribe, subscribers stay in memory, keep receiving events, and:

  • Memory leaks occur
  • Threads/processors are wasted
  • Resources (DB, sockets, etc.) stay open

✅ How to Unsubscribe by Library


1️⃣ RxJava

You use a Disposable object to unsubscribe.

🔧 Example:

Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(i -> System.out.println("Received: " + i));

// Later, when done
disposable.dispose();

📌 Notes:

  • dispose() stops the flow and removes the subscriber.
  • Always dispose() in onDestroy() (Android) or shutdown hooks (server apps).

2️⃣ Project Reactor (Reactor Core)

You get a Disposable or use BaseSubscriber to cancel().

🔧 Example:

Disposable disposable = Flux.interval(Duration.ofSeconds(1))
    .subscribe(i -> System.out.println("Tick: " + i));

// Unsubscribe later
disposable.dispose();

Or with manual control:

BaseSubscriber<Long> subscriber = new BaseSubscriber<>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(Long.MAX_VALUE); // or control backpressure
    }

    @Override
    protected void hookOnNext(Long value) {
        System.out.println("Got: " + value);
    }
};

// Later: unsubscribe
subscriber.cancel();

3️⃣ Spring WebFlux

Spring WebFlux auto-handles subscription lifecycle per HTTP request.

But if you’re streaming or subscribing inside a service, use .doFinally() to clean up.

🔧 Example:

Flux.interval(Duration.ofSeconds(1))
    .doOnCancel(() -> System.out.println("Stream canceled"))
    .doFinally(signal -> System.out.println("Final signal: " + signal))
    .subscribe();

Use .take(n) or .timeout(...) to auto-complete a stream.

4️⃣ Akka Streams

Streams are materialized into a Cancellable or KillSwitch.

🔧 Example:

val cancellable = Source.tick(1.second, 1.second, "tick")
  .to(Sink.foreach(println))
  .run()

// Later
cancellable.cancel()

Or with shared cancellation:

val killSwitch = KillSwitches.shared("my-switch")
val source = Source.tick(...).via(killSwitch.flow)
killSwitch.shutdown() // stop all flows

🛠 Pro Tips

TipWhy
Use .take(n) or .timeout(...)Auto-unsubscribes when data limit or time passes
Use .doFinally(...)Clean up resources (close file, release connection)
Store and manage Disposable or Subscription referencesUnsubscribe explicitly when no longer needed
Combine with lifecycle hooks (Spring, Android, etc.)Prevent leaks during shutdowns or navigation
This entry was posted in Без рубрики. Bookmark the permalink.