Subjects in RxJava

  • PublishSubject
  • BehaviorSubject
  • AsyncSubject
  • ReplaySubject
  • UnicastSubject
  • SingleSubject

A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by re-emitting them, and it can also emit new items.

Publish Subject

It emits all the subsequent items of the source Observable at the time of subscription.

Here, if a student entered late into the classroom, he just wants to listen from that point of time when he entered the classroom.

PublishSubject<Integer> source = PublishSubject.create();

// It will get 1, 2, 3, 4 and onComplete
source.subscribe(getFirstObserver()); 

source.onNext(1);
source.onNext(2);
source.onNext(3);

// It will get 4 and onComplete for second observer also.
source.subscribe(getSecondObserver());

source.onNext(4);
source.onComplete();

Replay Subject

It emits all the items of the source Observable, regardless of when the subscriber subscribes.

Here, if a student entered late into the classroom, he wants to listen from the beginning.

ReplaySubject<Integer> source = ReplaySubject.create();
// It will get 1, 2, 3, 4
source.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
// It will also get 1, 2, 3, 4 as we have used replay Subject
source.subscribe(getSecondObserver());

Behavior Subject

It emits the most recently emitted item and all the subsequent items of the source Observable when an observer subscribes to it.

Here, if a student entered late into the classroom, he wants to listen the most recent things(not from the beginning) being taught by the professor so that he gets the idea of the context.

BehaviorSubject<Integer> source = BehaviorSubject.create();
// It will get 1, 2, 3, 4 and onComplete
source.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
// It will get 3(last emitted)and 4(subsequent item) and onComplete
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();

Async Subject

It only emits the last value of the source Observable(and only the last value) only after that source Observable completes.

Here, if a student entered at any point of time into the classroom, and he wants to listen only about the last thing(and only the last thing) being taught, after class is over.

AsyncSubject<Integer> source = AsyncSubject.create();
// It will get only 4 and onComplete
source.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
// It will also get only get 4 and onComplete
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();

References
http://reactivex.io/documentation/subject.html
https://blog.mindorks.com/understanding-rxjava-subject-publish-replay-behavior-and-async-subject-224d663d452f
https://www.journaldev.com/22573/rxjava-subject

Create Observable using empty method in RxJava

This type of source signals completion immediately upon subscription.

Observable<String> empty = Observable.empty();

empty.subscribe(new DefaultObserver<String>() {
    @Override
    public void onNext(String s) {
        System.out.println("This should never be printed!");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Or this!");
    }

    @Override
    public void onComplete() {
        System.out.println("Done will be printed.");
    }
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#empty

Create Observable using interval method in RxJava

Periodically generates an infinite, ever increasing numbers (of type Long). The intervalRangevariant generates a limited amount of such numbers.

Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS);

clock.subscribe(time -> {
    if (time % 2 == 0) {
        System.out.println("Tick");
    } else {
        System.out.println("Tock");
    }
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#interval
http://reactivex.io/documentation/operators/interval.html

Schedulers in RxJava

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

 

As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

Schedulers.newThread

This scheduler simply starts a new thread every time it is requested via subscribeOn() or observeOn().

Observable.just("Hello")
        .observeOn(Schedulers.newThread())
        .doOnNext(s ->
                System.out.println(Thread.currentThread().getName())
        )
        .observeOn(Schedulers.newThread())
        .subscribe(s ->
                {
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

        );

Schedulers.immediate

Schedulers.immediate is a special scheduler that invokes a task within the client thread in a blocking way, rather than asynchronously and returns when the action is completed.In fact, subscribing to an Observable via immediate Scheduler typically has the same effect as not subscribing with any particular Scheduler at all.

observable.subscribeOn(Schedulers.immediate())

Schedulers.trampoline

It is quite similar to ImmediateScheduler as it also blocks the thread, however, it waits for the current task to execute completely(while Immediate Scheduler invokes the task right away). Trampoline schedulers come in handy when we have more than one observable and we want them to execute in order.

observable.subscribeOn(Schedulers.trampoline())

Schedulers.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

Observable.just("Hello")
        .observeOn(Schedulers.io())
        .doOnNext(s ->
                System.out.println(Thread.currentThread().getName())
        )
        .subscribeOn(Schedulers.io())
        .subscribe(s ->
                {
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

        );

Schedulers.from

there is a wrapper that can turn Executor into Scheduler using the from factory method

Executor executor = Executors.newSingleThreadExecutor();

Observable.just("Hello")
        .observeOn(Schedulers.from(executor))
        .doOnNext(s ->
                System.out.println(Thread.currentThread().getName())
        )
        .subscribe(s ->
                {
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

        );

Schedulers.single

This scheduler is quite simple as it is backed just by one single thread. So no matter how many observables are there, it will run only on that one thread. It can be thought as a replacement to your main thread.

observable.subscribeOn(Schedulers.single())

Schedulers.computation

This scheduler is quite similar to IO Schedulers as this is backed by thread pool too. However, the number of threads that can be used is fixed to the number of cores present in the system.

observable.subscribeOn(Schedulers.computation())

Android Scheduler

This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.

AndroidSchedulers.mainThread()
Observable.just("one", "two", "three", "four", "five")
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(/* an Observer */);

Observing on arbitrary loopers

Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
        .observeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe(/* an Observer */)

References
http://reactivex.io/documentation/scheduler.html
https://android.jlelse.eu/rxjava-schedulers-what-when-and-how-to-use-it-6cfc27293add
https://www.baeldung.com/rxjava-schedulers

Create Observable using fromFuture method in RxJava

the Future class represents a future result of an asynchronous computation – a result that will eventually appear in the Future after the processing is complete.

public class SquareCalculator {    
     
    private ExecutorService executor 
      = Executors.newSingleThreadExecutor();
     
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

Given a pre-existing, already running or already completed java.util.concurrent.Future, wait for the Future to complete normally or with an exception in a blocking fashion and relay the produced value or exception to the consumers.

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

Future<String> future = executor.schedule(() -> "Hello world!", 1, TimeUnit.SECONDS);

Observable<String> observable = Observable.fromFuture(future);

observable.subscribe(
    item -> System.out.println(item), 
    error -> error.printStackTrace(),
    () -> System.out.println("Done"));

executor.shutdown();

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromfuture
https://www.baeldung.com/java-future

Create Observable using fromRunnable method in RxJava

When a consumer subscribes, the given io.reactivex.function.Action is invoked and the consumer completes or receives the exception the Action threw.

Runnable runnable = () -> System.out.println("Hello World!");

Completable completable = Completable.fromRunnable(runnable);

completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());

Note: the difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.

 

The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromrunnable
https://stackoverflow.com/questions/141284/the-difference-between-the-runnable-and-callable-interfaces-in-java

Create Observable using fromAction method in RxJava

When a consumer subscribes, the given io.reactivex.function.Action is invoked and the consumer completes or receives the exception the Action threw.

Action action = () -> System.out.println("Hello World!");

Completable completable = Completable.fromAction(action);

completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());

Note: the difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.

 

Create Observable using fromCallable method in RxJava

When a consumer subscribes, the given java.util.concurrent.Callable is invoked and its returned value (or thrown exception) is relayed to that consumer.

The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

Callable<String> callable = () -> {
    System.out.println("Hello World!");
    return "Hello World!");
}

Observable<String> observable = Observable.fromCallable(callable);

observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), 
    () -> System.out.println("Done"));

Remark: In Completable, the actual returned value is ignored and the Completable simply completes.

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromcallable
https://stackoverflow.com/questions/141284/the-difference-between-the-runnable-and-callable-interfaces-in-java

Create Observable using fromArray method in RxJava

Constructs a sequence from a pre-existing source or generator type.
Signals the elements of the given array and then completes the sequence.

String[] array = new String[10];
for (int i = 0; i < array.length; i++) {
    array[i] = String.valueOf(i);
}

Observable<String> observable = Observable.fromArray(array);

observable.subscribe(s -> {
    System.out.println(s);
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromarray