Create Observable using interval method in RxJava

Periodically generates an infinite, ever increasing numbers (of type Long). The intervalRangevariant generates a limited amount of such numbers.

Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS);

clock.subscribe(time -> {
    if (time % 2 == 0) {
        System.out.println("Tick");
    } else {
        System.out.println("Tock");
    }
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#interval
http://reactivex.io/documentation/operators/interval.html

Schedulers in RxJava

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

 

As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

Schedulers.newThread

This scheduler simply starts a new thread every time it is requested via subscribeOn() or observeOn().

Observable.just("Hello")
        .observeOn(Schedulers.newThread())
        .doOnNext(s ->
                System.out.println(Thread.currentThread().getName())
        )
        .observeOn(Schedulers.newThread())
        .subscribe(s ->
                {
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

        );

Schedulers.immediate

Schedulers.immediate is a special scheduler that invokes a task within the client thread in a blocking way, rather than asynchronously and returns when the action is completed.In fact, subscribing to an Observable via immediate Scheduler typically has the same effect as not subscribing with any particular Scheduler at all.

observable.subscribeOn(Schedulers.immediate())

Schedulers.trampoline

It is quite similar to ImmediateScheduler as it also blocks the thread, however, it waits for the current task to execute completely(while Immediate Scheduler invokes the task right away). Trampoline schedulers come in handy when we have more than one observable and we want them to execute in order.

observable.subscribeOn(Schedulers.trampoline())

Schedulers.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

Observable.just("Hello")
        .observeOn(Schedulers.io())
        .doOnNext(s ->
                System.out.println(Thread.currentThread().getName())
        )
        .subscribeOn(Schedulers.io())
        .subscribe(s ->
                {
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

        );

Schedulers.from

there is a wrapper that can turn Executor into Scheduler using the from factory method

Executor executor = Executors.newSingleThreadExecutor();

Observable.just("Hello")
        .observeOn(Schedulers.from(executor))
        .doOnNext(s ->
                System.out.println(Thread.currentThread().getName())
        )
        .subscribe(s ->
                {
                    System.out.println(s);
                    System.out.println(Thread.currentThread().getName());
                }

        );

Schedulers.single

This scheduler is quite simple as it is backed just by one single thread. So no matter how many observables are there, it will run only on that one thread. It can be thought as a replacement to your main thread.

observable.subscribeOn(Schedulers.single())

Schedulers.computation

This scheduler is quite similar to IO Schedulers as this is backed by thread pool too. However, the number of threads that can be used is fixed to the number of cores present in the system.

observable.subscribeOn(Schedulers.computation())

Android Scheduler

This Scheduler is provided by rxAndroid library. This is used to bring back the execution to the main thread so that UI modification can be made. This is usually used in observeOn method.

AndroidSchedulers.mainThread()
Observable.just("one", "two", "three", "four", "five")
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(/* an Observer */);

Observing on arbitrary loopers

Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
        .observeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe(/* an Observer */)

References
http://reactivex.io/documentation/scheduler.html
https://android.jlelse.eu/rxjava-schedulers-what-when-and-how-to-use-it-6cfc27293add
https://www.baeldung.com/rxjava-schedulers

Create Observable using fromFuture method in RxJava

the Future class represents a future result of an asynchronous computation – a result that will eventually appear in the Future after the processing is complete.

public class SquareCalculator {    
     
    private ExecutorService executor 
      = Executors.newSingleThreadExecutor();
     
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

Given a pre-existing, already running or already completed java.util.concurrent.Future, wait for the Future to complete normally or with an exception in a blocking fashion and relay the produced value or exception to the consumers.

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

Future<String> future = executor.schedule(() -> "Hello world!", 1, TimeUnit.SECONDS);

Observable<String> observable = Observable.fromFuture(future);

observable.subscribe(
    item -> System.out.println(item), 
    error -> error.printStackTrace(),
    () -> System.out.println("Done"));

executor.shutdown();

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromfuture
https://www.baeldung.com/java-future

Create Observable using fromRunnable method in RxJava

When a consumer subscribes, the given io.reactivex.function.Action is invoked and the consumer completes or receives the exception the Action threw.

Runnable runnable = () -> System.out.println("Hello World!");

Completable completable = Completable.fromRunnable(runnable);

completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());

Note: the difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.

 

The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromrunnable
https://stackoverflow.com/questions/141284/the-difference-between-the-runnable-and-callable-interfaces-in-java

Create Observable using fromAction method in RxJava

When a consumer subscribes, the given io.reactivex.function.Action is invoked and the consumer completes or receives the exception the Action threw.

Action action = () -> System.out.println("Hello World!");

Completable completable = Completable.fromAction(action);

completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());

Note: the difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.

 

Create Observable using fromCallable method in RxJava

When a consumer subscribes, the given java.util.concurrent.Callable is invoked and its returned value (or thrown exception) is relayed to that consumer.

The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

Callable<String> callable = () -> {
    System.out.println("Hello World!");
    return "Hello World!");
}

Observable<String> observable = Observable.fromCallable(callable);

observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), 
    () -> System.out.println("Done"));

Remark: In Completable, the actual returned value is ignored and the Completable simply completes.

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromcallable
https://stackoverflow.com/questions/141284/the-difference-between-the-runnable-and-callable-interfaces-in-java

Create Observable using fromArray method in RxJava

Constructs a sequence from a pre-existing source or generator type.
Signals the elements of the given array and then completes the sequence.

String[] array = new String[10];
for (int i = 0; i < array.length; i++) {
    array[i] = String.valueOf(i);
}

Observable<String> observable = Observable.fromArray(array);

observable.subscribe(s -> {
    System.out.println(s);
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromarray

Create Observable using fromIterable method in RxJava

Constructs a sequence from a pre-existing source or generator type.
Signals the items from a java.lang.Iterable source (such as Lists, Sets or Collections or custom Iterables) and then completes the sequence.

List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");

Observable<String> observable = Observable.fromIterable(list);

observable.subscribe(s -> {
    System.out.println(s);
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#fromiterable

Create Observable using just method in RxJava

Constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription.

Observable<String> observable = Observable.just("1");

observable.subscribe(s -> {
    System.out.println(s);
});
Observable<String> observable = Observable.just("1", "2", "3", "4");

observable.subscribe(s -> {
    System.out.println(s);
});

References
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#just