Let’s look at how to unsubscribe from a stream in different reactive libraries to avoid memory leaks or resource hogging.
🧼 Why Unsubscribe?
Streams may emit:
- Forever (e.g., mouse clicks, websocket events)
- Or slow, bursty data (e.g., Kafka, sensors)
If you don’t unsubscribe, subscribers stay in memory, keep receiving events, and:
- Memory leaks occur
- Threads/processors are wasted
- Resources (DB, sockets, etc.) stay open
✅ How to Unsubscribe by Library
1️⃣ RxJava
You use a Disposable
object to unsubscribe.
🔧 Example:
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(i -> System.out.println("Received: " + i));
// Later, when done
disposable.dispose();
📌 Notes:
dispose()
stops the flow and removes the subscriber.- Always
dispose()
inonDestroy()
(Android) or shutdown hooks (server apps).
2️⃣ Project Reactor (Reactor Core)
You get a Disposable
or use BaseSubscriber
to cancel()
.
🔧 Example:
Disposable disposable = Flux.interval(Duration.ofSeconds(1))
.subscribe(i -> System.out.println("Tick: " + i));
// Unsubscribe later
disposable.dispose();
Or with manual control:
BaseSubscriber<Long> subscriber = new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(Long.MAX_VALUE); // or control backpressure
}
@Override
protected void hookOnNext(Long value) {
System.out.println("Got: " + value);
}
};
// Later: unsubscribe
subscriber.cancel();
3️⃣ Spring WebFlux
Spring WebFlux auto-handles subscription lifecycle per HTTP request.
But if you’re streaming or subscribing inside a service, use .doFinally()
to clean up.
🔧 Example:
Flux.interval(Duration.ofSeconds(1))
.doOnCancel(() -> System.out.println("Stream canceled"))
.doFinally(signal -> System.out.println("Final signal: " + signal))
.subscribe();
Use .take(n)
or .timeout(...)
to auto-complete a stream.
4️⃣ Akka Streams
Streams are materialized into a Cancellable
or KillSwitch
.
🔧 Example:
val cancellable = Source.tick(1.second, 1.second, "tick")
.to(Sink.foreach(println))
.run()
// Later
cancellable.cancel()
Or with shared cancellation:
val killSwitch = KillSwitches.shared("my-switch")
val source = Source.tick(...).via(killSwitch.flow)
killSwitch.shutdown() // stop all flows
🛠 Pro Tips
Tip | Why |
---|---|
Use .take(n) or .timeout(...) | Auto-unsubscribes when data limit or time passes |
Use .doFinally(...) | Clean up resources (close file, release connection) |
Store and manage Disposable or Subscription references | Unsubscribe explicitly when no longer needed |
Combine with lifecycle hooks (Spring, Android, etc.) | Prevent leaks during shutdowns or navigation |