emit numbers in sequence every specified duration using interval and timer in RxJS

interval

// RxJS v6+
import { interval } from 'rxjs';

//emit value in sequence every 1 second
const source = interval(1000);
//output: 0,1,2,3,4,5....
const subscribe = source.subscribe(val => console.log(val));

timer

// RxJS v6+
import { timer } from 'rxjs';

/*
  timer takes a second argument, how often to emit subsequent values
  in this case we will emit first value after 1 second and subsequent
  values every 2 seconds after
*/
const source = timer(1000, 2000);
//output: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));
// RxJS v6+
import { timer } from 'rxjs';

//emit 0 after 1 second then complete, since no second argument is supplied
const source = timer(1000);
//output: 0
const subscribe = source.subscribe(val => console.log(val));

References
https://stackoverflow.com/questions/44165893/how-do-i-make-an-observable-interval-start-immediately-without-a-delay
https://www.learnrxjs.io/operators/creation/interval.html
https://www.learnrxjs.io/operators/creation/timer.html
https://rxjs.dev/api/index/function/interval
https://rxjs.dev/api/index/function/timer
https://stackoverflow.com/questions/44165893/how-do-i-make-an-observable-interval-start-immediately-without-a-delay

Detect window size changes using RxJS debounce in Angular

sizeChanged: Subject<boolean>;
sizeChangedDebounced;

ngOnInit() {
  // show chart on init and size changes
  this.showChart();
  this.sizeChanged = new Subject<boolean>();
  this.sizeChangedDebounced = this.sizeChanged.pipe(debounce(() => interval(1000)));
  this.sizeChangedDebounced.subscribe(() => {
    this.showChart();
  });
}

showChart() {
  // do something
}

@HostListener('window:resize', ['$event'])
onResize(event?) {
  this.sizeChanged.next(true);
}

debounce example

import { fromEvent, interval } from 'rxjs';
import { debounce } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const result = clicks.pipe(debounce(() => interval(1000)));
result.subscribe(x => console.log(x));

References
https://rxjs-dev.firebaseapp.com/api/operators/debounce

Call next() from outside of the Observable using Subject

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

References
https://stackoverflow.com/questions/41859189/angular2-observable-how-to-call-next-from-outside-of-the-observables-construc
https://rxjs-dev.firebaseapp.com/guide/subject

Concat VS Merge function in RxJS

merge can interleave the outputs, while concat will first wait for earlier streams to finish before processing later streams

Observable.merge(
        Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
        Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
.subscribe(System.out::println);
A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

Observable.concat(
        Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
        Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
.subscribe(System.out::println);
A0 A1 A2 A3 A4 A5 A6 A7 A8

References
https://stackoverflow.com/questions/38903094/concat-vs-merge-operator
https://rxjs.dev/api/index/function/merge
https://rxjs.dev/api/index/function/concat

Producer/Consumer in RxJS

this.newAlarm = new Observable<SocketAlarmDto>(observer => {

  socket.on('alarm', (data: SocketAlarmDto) => {
    observer.next(data);
  });

});
this.newAlarmSubscription = this.socket.newAlarm.subscribe((data: SocketAlarmDto) => {
  async.series([
    (callback => this.syncAlarms(callback)),
    (callback => this.showAlarms(callback))
  ]);

});
this.newAlarmSubscription.unsubscribe();

 

Filter an array of Observable Object using RxJS

map((reports : IReport[]) => reports.filter(p => p.reportName === name))
findItemsByParentId(id: number): Promise<Item[]> {
  return new Promise<Item[]>((resolve, reject) => {
    this.localStorage.getItem('items')
      .pipe(map((items: Item[]) => items.filter((item: Item) => item.parentId === id)))
      .subscribe((value: Item[]) => {
        resolve(value);
      });
  });
}

References
https://www.learnrxjs.io/operators/transformation/map.html
https://rxjs.dev/api/operators/map
https://stackoverflow.com/questions/52263374/angular-rxjs-observable-filter-on-array-of-objects

Filter an Observable using RxJS

filter objects based on property

// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

//emit ({name: 'Joe', age: 31}, {name: 'Bob', age:25})
const source = from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]);
//filter out people with age under 30
const example = source.pipe(filter(person => person.age >= 30));
//output: "Over 30: Joe"
const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`));

References
https://www.learnrxjs.io/operators/filtering/filter.html
https://rxjs.dev/api/operators/filter