https://stackblitz.com/edit/rxjs-pranay-example?file=index.ts
takeUntil
My favorite way of unsubscribing from streams is takeUntil
. This operator will allow you to unsubscribe a stream when the stream passed as input to the operator emits an event. Oh, that seems too complicated, but it’s actually not.
Let’s see an example:
- we have two observables that emit with an interval of respectively 1000 ms (1 second) and 100 ms
- the stream that emits every 100ms will unsubscribe when the other stream emits, which will happen every 1 second
import { of, interval } from 'rxjs';
import { map, takeUntil, takeWhile } from 'rxjs/operators';
const source = of('World').pipe(map(x => `Hello ${x}!`));
source.subscribe(console.log);
//takeUntil
const slow$ = interval(1000); // emit every 100 ms
const fast$ = interval(100).pipe(takeUntil(slow$));
fast$.subscribe({
next(n) {
console.log(n);
},
complete() {
console.log('I am unsubscribed!');
}
});
slow$.subscribe();
takeWhile
This operator is very useful to unsubscribe streams based on their own value. One of the ways I needed to use this operator is to stop certain timers once they reach a certain number of iterations. For instance, a countdown timer.
In the following example, I want to stop a timer once it iterates for 5 times. —
- the
takeWhile
operator accepts a predicate function whose parameter is the current value of the stream - if the predicate is truthy, it will keep emitting values; if it’s falsy, then it will unsubscribe the stream
import { of, interval } from 'rxjs';
import { map, takeUntil, takeWhile } from 'rxjs/operators';
const source = of('World').pipe(map(x => `Hello ${x}!`));
source.subscribe(console.log);
// takeWhile
const stream$ = interval(1000).pipe(takeWhile(n => n < 5));
stream$.subscribe({
next(n) {
console.log(n);
},
complete() {
console.log('I am unsubscribed!');
}
});
What are Subjects?
Subject is both an observable and observer.
- Observer — it has the next, error, and complete methods.
- Observable — it has all the observable operators, and you can subscribe to him.
A subject can act as a bridge/proxy between the source observable and many observers, making it possible for multiple observers to share the same observable execution.
remember that a subject is also an observer, and what observers can do? They can listen to observables with the next()
, error()
and complete()
methods.