Showing posts with label rxjs. Show all posts
Showing posts with label rxjs. Show all posts

Sunday, 16 January 2022

BehaviorSubject and ReplaySubject in Rxjs

 

  • BehaviorSubject

A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.

  • ReplaySubject

A variant of Subject that “replays” or emits old values to new subscribers. It buffers a set number of values and will emit those values immediately to any new subscribers in addition to emitting new values to existing subscribers.

Sunday, 21 November 2021

Are observables synchronous or asynchronous?

 1.Synchronous observable:


Observables are lazy Push collections of multiple values.

In our async operator we've used setTimeout, this setTimeout will assure that the main thread will not get blocked because setTimeout
is a browser API and our code will be moved to the event loop.

const simpleBlockingOperator =
(noOfLoops: number) =>
<T>(source: Observable<T>): Observable<T> => {
return new Observable((observable) => {
source.subscribe({
next: (no) => {
let loops = 0;
while (loops !== noOfLoops) {
loops++;
}
console.log("Done loooping" + loops, " ", noOfLoops);
return observable.next(no);
},
error: (error) => observable.error(error),
complete: () => observable.complete(),
});
});
};
2.Asynchronous observable:

const simpleNonBlockingOperator =
(noOfLoops: number) =>
<T>(source: Observable<T>): Observable<T> => {
return new Observable((observable) => {
source.subscribe({
next: (no) => {
setTimeout(() => {
let loops = 0;
while (loops !== noOfLoops) {
loops++;
}
console.log("Done loooping" + loops, " ", noOfLoops);
return observable.next(no);
}, 0);
},
error: (error) => observable.error(error),
complete: () => observable.complete(),
});
});
};



Sunday, 11 July 2021

rxjs takeUntil and takeWhile Example


https://stackblitz.com/edit/rxjs-pranay-example?file=index.ts


takeUntil

My favorite way of unsubscribing from streams is takeUntil. This operator will allow you to unsubscribe a stream when the stream passed as input to the operator emits an event. Oh, that seems too complicated, but it’s actually not.

Let’s see an example:

  • we have two observables that emit with an interval of respectively 1000 ms (1 second) and 100 ms
  • the stream that emits every 100ms will unsubscribe when the other stream emits, which will happen every 1 second



import { of, interval } from 'rxjs';
import { map, takeUntil, takeWhile } from 'rxjs/operators';

const source = of('World').pipe(map(x => `Hello ${x}!`));

source.subscribe(console.log);

//takeUntil
const slow$ = interval(1000); // emit every 100 ms

const fast$ = interval(100).pipe(takeUntil(slow$));

fast$.subscribe({
next(n) {
console.log(n);
},
complete() {
console.log('I am unsubscribed!');
}
});

slow$.subscribe();



takeWhile

This operator is very useful to unsubscribe streams based on their own value. One of the ways I needed to use this operator is to stop certain timers once they reach a certain number of iterations. For instance, a countdown timer.

In the following example, I want to stop a timer once it iterates for 5 times. —

  • the takeWhile operator accepts a predicate function whose parameter is the current value of the stream
  • if the predicate is truthy, it will keep emitting values; if it’s falsy, then it will unsubscribe the stream


import { of, interval } from 'rxjs';
import { map, takeUntil, takeWhile } from 'rxjs/operators';

const source = of('World').pipe(map(x => `Hello ${x}!`));

source.subscribe(console.log);


// takeWhile
const stream$ = interval(1000).pipe(takeWhile(n => n < 5));

stream$.subscribe({
next(n) {
console.log(n);
},
complete() {
console.log('I am unsubscribed!');
}
});



What are Subjects?

Subject is both an observable and observer.

  1. Observer — it has the next, error, and complete methods.
  2. Observable — it has all the observable operators, and you can subscribe to him.

A subject can act as a bridge/proxy between the source observable and many observers, making it possible for multiple observers to share the same observable execution.

remember that a subject is also an observer, and what observers can do? They can listen to observables with the next()error() and complete() methods.