RxJava Flashcards

1
Q

What is RxJava?

A

RxJava is a Java based extension of ReactiveX. It provides implementation or ReactiveX project in Java. Following are the key characteristics of RxJava.

  • Extends the observer pattern.
  • Support sequences of data/events.
  • Provides operators to compose sequences together declaratively.
  • Handles threading, synchronization, thread-safety and concurrent data structures internally.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

What is ReactiveX?

A

ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers.

As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern.

ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

What is Functional Programming?

A

Functional programming revolves around building the software using pure functions.

A pure function do not depends upon previous state and always returns the same result for the same parameters passed.

Pure functions helps avoiding problems associated with shared objects, mutable data and side effects often prevalent in multi-threading environments.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

What is Reactive Programming?

A

Reactive programming refers to event driven programming where data streams comes in asynchronous fashion and get processed when they are arrived.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

What is Functional Reactive Programming?

A

RxJava implements both the concepts together, where data of streams changes over time and consumer function reacts accordingly.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

What is Reactive Manifesto?

A

Reactive Manifesto is an on-line document stating the high standard of application software systems. As per the manifesto, following are the key attributes of a reactive software :-

  • Responsive − Should always respond in a timely fashion.
  • Message Driven − Should use asynchronous message-passing between components so that they maintain loose coupling.
  • Elastic − Should stay responsive even under high load.
  • Resilient − Should stay responsive even if any component(s) fail.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Explain Key components of RxJava.

A

RxJava have two key components: Observables and Observer.

  • Observable − It represents an object similar to Stream which can emit zero or more data, can send error message, whose speed can be controlled while emitting a set of data, can send finite as well as infinite data.
  • Observer − It subscribes to Observable’s data of sequence and reacts per item of the observables. Observers are notified whenever Observable emits a data. An Observer handles data one by one.

An observer is never notified if items are not present or a callback is not returned for a previous item.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

RxJava - How Observable works ?

A

Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items.

Observable

  • Observable provides data once subscriber starts listening.
  • Observable can emit any number of items.
  • Observable can emit only signal of completion as well with no item.
  • Observable can terminate successfully.
  • Observable may never terminate. e.g. a button can be clicked any number of times.
  • Observable may throw error at any point of time.

Subscriber

  • Observable can have multiple subscribers.
  • When an Observable emits an item, each subscriber onNext() method gets invoked.
  • When an Observable finished emitting items, each subscriber onComplete() method gets invoked.
  • If an Observable emits error, each subscriber onError() method gets invoked.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

RxJava - Creating Observables

A

Following are the base classes to create observables.

  • Flowable − 0..N flows, Emits 0 or n items. Supports Reactive-Streams and back-pressure.
  • Observable − 0..N flows ,but no back-pressure.
  • Single − 1 item or error. Can be treated as a reactive version of method call.
  • Completable − No item emitted. Used as a signal for completion or error. Can be treated as a reactive version of Runnable.
  • MayBe − Either No item or 1 item emitted. Can be treated as a reactive version of Optional.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Explain the convenient methods to create observables in Observable class.

A

Following are the convenient methods to create observables in Observable class.

  • just(T item) − Returns an Observable that signals the given (constant reference) item and then completes.
  • fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.
  • fromArray(T… items) − Converts an Array into an ObservableSource that emits the items in the Array.
  • fromCallable(Callable supplier) − Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
  • fromFuture(Future future) − Converts a Future into an ObservableSource.
  • interval(long initialDelay, long period, TimeUnit unit) − Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

RxJava - Single Observable

A

The Single class represents the single value response. Single observable can only emit either a single successful value or an error. It does not emit onComplete event.

Class Declaration

Following is the declaration for io.reactivex.Single<T> class −</T>

public abstract class Single<T>
extends Object
implements SingleSource<T></T></T>

Protocol

Following is the sequential protocol that Single Observable operates −

onSubscribe (onSuccess | onError)?

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

RxJava - Single Observable Example

A

Single Example

import java.util.concurrent.TimeUnit;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create the observable
Single<String> testSingle = Single.just("Hello World");</String>

  //Create an observer
  Disposable disposable = testSingle
     .delay(2, TimeUnit.SECONDS, Schedulers.io())
     .subscribeWith(
     new DisposableSingleObserver<String>() {

     @Override
     public void onError(Throwable e) { 
        e.printStackTrace();
     }

     @Override
     public void onSuccess(String value) {
        System.out.println(value);
     }
  }); 
  Thread.sleep(3000);
  //start observing
  disposable.dispose();    } }

Hello World

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

RxJava - MayBe Observable

A

The MayBe class represents deferred response. MayBe observable can emit either a single successful value or no value.

Class Declaration

Following is the declaration for io.reactivex.Single<T> class −</T>

public abstract class Maybe<T>
extends Object
implements MaybeSource<T></T></T>

Protocol

Following is the sequential protocol that MayBe Observable operates −

onSubscribe (onSuccess | onError | OnComplete)?

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

RxJava - MayBe Observable Example

A

MayBe Example

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Maybe.just(“Hello World”)
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}</String>

     @Override
     public void onSuccess(String value) {
        System.out.println(value);
     }

     @Override
     public void onComplete() {
        System.out.println("Done!");
     }
  }); 
  Thread.sleep(3000);
  //start observing
  disposable.dispose();    } }
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

RxJava - Completable Observable

A

The Completable class represents deferred response. Completable observable can either indicate a successful completion or error.

Class Declaration

Following is the declaration for io.reactivex.Completable class −

public abstract class Completable
extends Object
implements CompletableSource

Protocol

Following is the sequential protocol that Completable Observable operates −

onSubscribe (onError | onComplete)?

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

RxJava - Completable Observable Example

A

Completable Example

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {

  //Create an observer
  Disposable disposable = Completable.complete()
     .delay(2, TimeUnit.SECONDS, Schedulers.io())
     .subscribeWith(new DisposableCompletableObserver() {
     @Override
     public void onError(Throwable e) { 
        e.printStackTrace();
     }
     @Override
     public void onStart() {
        System.out.println("Started!");
     }
     @Override
     public void onComplete() {
        System.out.println("Done!");
     }
  }); 
  Thread.sleep(3000);
  //start observing
  disposable.dispose();    } }

Started Done

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

RxJava - Using CompositeDisposable

A

The CompositeDisposable class represents a container which can hold multiple disposable and offers O(1) complexity of adding and removing disposables.

Class Declaration

Following is the declaration for io.reactivex.disposables.CompositeDisposable class −

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

17
Q

RxJava - Using CompositeDisposable Example

A

CompositeDisposable Example

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
CompositeDisposable compositeDisposable = new CompositeDisposable();

  //Create an Single observer 
  Disposable disposableSingle = Single.just("Hello World")
  .delay(2, TimeUnit.SECONDS, Schedulers.io())
  .subscribeWith(
  new DisposableSingleObserver<String>() {
     @Override
     public void onError(Throwable e) {
        e.printStackTrace();
     }

     @Override
     public void onSuccess(String value) {
        System.out.println(value);
     }
  }); 

  //Create an observer
  Disposable disposableMayBe = Maybe.just("Hi")
  .delay(2, TimeUnit.SECONDS, Schedulers.io())
  .subscribeWith(new DisposableMaybeObserver<String>() {
     @Override
     public void onError(Throwable e) { 
        e.printStackTrace();
     }

     @Override
     public void onSuccess(String value) {
        System.out.println(value);
     }

     @Override
     public void onComplete() {
        System.out.println("Done!");
     }
  }); 

  Thread.sleep(3000);

  compositeDisposable.add(disposableSingle);
  compositeDisposable.add(disposableMayBe);

  //start observing
  compositeDisposable.dispose();    } }

Hello World Hi

18
Q

RxJava - Creating Operators

A

Following are the operators which are used to create an Observable.
1. Create : Creates an Observable from scratch and allows observer method to call programmatically.

  1. Defer : Do not create an Observable until an observer subscribes. Creates a fresh observable for each observer.
  2. Empty/Never/Throw : Creates an Observable with limited behavior.
  3. From : Converts an object/data structure into an Observable.
  4. Interval : Creates an Observable emitting integers in sequence with a gap of specified time interval.
  5. Just : Converts an object/data structure into an Observable to emit the same or same type of objects.
  6. Range : Creates an Observable emitting integers in sequence of given range.
  7. Repeat : Creates an Observable emitting integers in sequence repeatedly.
  8. Start : Creates an Observable to emit the return value of a function.
  9. Timer : Creates an Observable to emit a single item after given delay.
19
Q

Creating Operator Example

A

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}</String>

ABCDEFG

20
Q

RxJava - Transforming Operators

A

Following are the operators which are used to transform an item emitted from an Observable:-

  1. Buffer : Gathers items from Observable into bundles periodically and then emit the bundles rather than items.
  2. FlatMap : Used in nested observables. Transforms items into Observables. Then flatten the items into single Observable.
  3. GroupBy : Divide an Observable into set of Observables organized by key to emit different group of items.
  4. Map : Apply a function to each emitted item to transform it.
  5. Scan : Apply a function to each emitted item, sequentially and then emit the successive value.
  6. Window : Gathers items from Observable into Observable windows periodically and then emit the windows rather than items.
21
Q

RxJava - Filtering Operators

A

Following are the operators which are used to selectively emit item(s) from an Observable:-

  1. Debounce : Emits items only when timeout occurs without emiting another item.
  2. Distinct : Emits only unique items.
  3. ElementAt : emit only item at n index emitted by an Observable.
  4. Filter : Emits only those items which pass the given predicate function.
  5. First : Emits the first item or first item which passed the given criteria.
  6. IgnoreElements : Do not emits any items from Observable but marks completion.
  7. Last : Emits the last element from Observable.
  8. Sample : Emits the most recent item with given time interval.
  9. Skip : Skips the first n items from an Observable.
  10. SkipLast : Skips the last n items from an Observable.
  11. Take : takes the first n items from an Observable.
  12. TakeLast : takes the last n items from an Observable.
22
Q

RxJava - Filtering Operator Example

A

Filtering Operator Example

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.take(2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}</String>

ab

23
Q

RxJava - Combining Operators

A

Following are the operators which are used to create a single Observable from multiple Observables :-

  1. And/Then/When : Combine item sets using Pattern and Plan intermediaries.
  2. CombineLatest : Combine the latest item emitted by each Observable via a specified function and emit resulted item.
  3. Join : Combine items emitted by two Observables if emitted during time-frame of second Observable emitted item.
  4. Merge : Combines the items emitted of Observables.
  5. StartWith : Emit a specified sequence of items before starting to emit the items from the source Observable
  6. Switch : Emits the most recent items emitted by Observables.
  7. Zip : Combines items of Observables based on function and emits the resulted items.
24
Q

RxJava - Combining Operators Example

A

Combining Operator Example

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {“a”, “b”, “c”, “d”, “e”, “f”, “g”};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}</Integer></String>

g1g2g3g4g5g6

25
Q

RxJava - Utility Operators

A

Following are the operators which are often useful with Observables.
1
Delay

Register action to handle Observable life-cycle events.

2
Materialize/Dematerialize

Represents item emitted and notification sent.

3
ObserveOn

Specify the scheduler to be observed.

4
Serialize

Force Observable to make serialized calls.

5
Subscribe

Operate upon the emissions of items and notifications like complete from an Observable

6
SubscribeOn

Specify the scheduler to be used by an Observable when it is subscribed to.

7
TimeInterval

Convert an Observable to emit indications of the amount of time elapsed between emissions.

8
Timeout

Issues error notification if specified time occurs without emitting any item.

9
Timestamp

Attach timestamp to each item emitted.

9
Using

Creates a disposable resource or same lifespan as that of Observable.

26
Q

RxJava - Subjects

A

As per the Reactive, a Subject can act as both Observable as well as Observer.

A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

There are four types of Subjects −

1
Publish Subject

Emits only those items which are emitted after time of subscription.

2 Replay Subject
Emits all the items emitted by source Observable regardless of when it has subscribed the Observable.

3
Behavior Subject

Upon subscription, emits the most recent item then continue to emit item emitted by the source Observable.

4
Async Subject

Emits the last item emitted by the source Observable after it’s completes emission.

27
Q

RxJava - Schedulers

A

Schedulers are used in multi-threading environment to work with Observable operators.

As per the Reactive,Scheduler are used to schedule how chain of operators will apply to different threads.

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

28
Q

RxJava - Schedulers Types

A

There are following types of Schedulers available in RxJava −

1
Schedulers.computation()

Creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations.

2
Schedulers.io()

Creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed.

3
Schedulers.newThread()

Creates and returns a Scheduler that creates a new Thread for each unit of work.

4
Schedulers.trampoline()

Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

4
Schedulers.from(java.util.concurrent.Executor executor)

Converts an Executor into a new Scheduler instance.

29
Q

RxJava - Trampoline Scheduler Example

A

Schedulers.trampoline() method creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

Schedulers.trampoline() Example

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just(“A”, “AB”, “ABC”)
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println(“Processing Thread “
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.trampoline()))
.subscribe(length -> System.out.println(“Receiver Thread “
+ Thread.currentThread().getName()
+ “, Item length “ + length));

     Thread.sleep(10000);    }    protected static Observable<Integer> getLengthWithDelay(String v) {
  Random random = new Random();
  try {
     Thread.sleep(random.nextInt(3) * 1000);
     return Observable.just(v.length());
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return null;    } }
30
Q

RxJava - NewThread Scheduler Example

A

Schedulers.newThread() method creates and returns a Scheduler that creates a new Thread for each unit of work.

Schedulers.newThread() Example

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just(“A”, “AB”, “ABC”)
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println(“Processing Thread “
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println(“Receiver Thread “
+ Thread.currentThread().getName()
+ “, Item length “ + length));

     Thread.sleep(10000);    }    protected static Observable<Integer> getLengthWithDelay(String v) {
  Random random = new Random();
  try {
     Thread.sleep(random.nextInt(3) * 1000);
     return Observable.just(v.length());
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return null;    } }
31
Q

RxJava - Computation Scheduler Example

A

Schedulers.computation() method creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations.

Schedulers.computation() Example

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just(“A”, “AB”, “ABC”)
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println(“Processing Thread “
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation()))
.subscribe(length -> System.out.println(“Receiver Thread “
+ Thread.currentThread().getName()
+ “, Item length “ + length));

     Thread.sleep(10000);    }    protected static Observable<Integer> getLengthWithDelay(String v) {
  Random random = new Random();
  try {
     Thread.sleep(random.nextInt(3) * 1000);
     return Observable.just(v.length());
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return null;    } }
32
Q

RxJava - IO Scheduler Example

A

Schedulers.io() method creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed. Best for I/O intensive operations.

Schedulers.io() Example

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just(“A”, “AB”, “ABC”)
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println(“Processing Thread “
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.io()))
.subscribe(length -> System.out.println(“Receiver Thread “
+ Thread.currentThread().getName()
+ “, Item length “ + length));

     Thread.sleep(10000);    }    protected static Observable<Integer> getLengthWithDelay(String v) {
  Random random = new Random();
  try {
     Thread.sleep(random.nextInt(3) * 1000);
     return Observable.just(v.length());
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return null;    } }
33
Q

RxJava - From Scheduler Example

A

Schedulers.from(Executor) method converts an Executor into a new Scheduler instance.

Schedulers.from(Executor) Example

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just(“A”, “AB”, “ABC”)
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println(“Processing Thread “
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
.subscribe(length -> System.out.println(“Receiver Thread “
+ Thread.currentThread().getName()
+ “, Item length “ + length));

     Thread.sleep(10000);    }    protected static Observable<Integer> getLengthWithDelay(String v) {
  Random random = new Random();
  try {
     Thread.sleep(random.nextInt(3) * 1000);
     return Observable.just(v.length());
  } catch (InterruptedException e) {
     e.printStackTrace();
  }
  return null;    } }
34
Q

What are hot and cold observables?

A

When the observable produces data itself, it is the cold observable. Conversely, when the observable produces data outside, it is the hot observable. While the cold observables start to run upon subscription and push values, hot observables produce values even before a subscription occurs.

35
Q

How can you transform a cold observable into a hot one?

A

There are two ways of converting a cold observable into a hot one. The first method uses publish() .connect(). The publish() converts cold observable to ConnectableObservable, which behaves like a hot one. Once triggered with the .connect() function, the observable publishes an event irrespective of whether there are subscribers. Another way to convert a cold observable into a hot one is by wrapping it using a subject. The subject first subscribes to a cold observable and exposes itself as a cold observable to other subscribers. The work gets performed irrespective of whether there are any subscribers.

36
Q

What is the difference between switchMap(), flatMap() and concatMap()?

A

SwitchMap(), flatMap() and concatMap() are operators that help in modifying data emitted by an observable. The primary difference between concatMap() and flatMap() is in the order in which they emit items. While flatMap() can interleave items while emitting, meaning that it does not maintain the order of the emitted items, concatMap() can preserve the order. ConcatMap() waits for each observable to complete the work. SwitchMap() is another Rx transformation operator that unsubscribes from previous observables after emitting new ones, which means only the most recently emitted observables get emitted.

37
Q

Explain the difference between OnNext(), OnError() and OnCompleted().

A

OnNext() gets called when the observable emits a new item and you can perform some action on each item, whereas the OnError() interface method gets called when an error occurs and the data does not get completed. The OnComplete() interface method gets called when the observable emits all items.

38
Q

Outline the difference between subscribeOn() and observeOn().

A

The SubscribeOn() operator conveys the scheduler the source to perform the required work. As the observable chain has only one initial source, every observable has only one subscribeOn() operator. Conversely, ObserveOn() conveys the scheduler as the source to perform all downstream operations. As there can be many such operators, having multiple observeOn() operators in a single chain observable is ideal. SubscribeOn() impacts all upstream operators, whereas observeOn() impacts all downstream operators.

If the developer specifies only subscribeOn(), all operators get executed on that thread. If the developer specifies only observeOn() operator, all operators on the current thread get executed. Only operators below the observeOn() get switched to threads specified by the observeOn().

39
Q

What is RxJava Flowable ?

A

Flowable is an extension of the Observable class in RxJava that is specifically designed to handle backpressure, which occurs when the rate at which data is emitted is faster than the rate at which it can be consumed.

A Flowable is similar to an Observable in that it represents a stream of data or events that can be observed and processed by Observers. However, Flowable adds the ability to handle backpressure, which is crucial when dealing with large amounts of data or slow consumers. Backpressure refers to the mechanism of controlling the flow of data so that the consumer can process it at its own pace, avoiding overwhelming or dropping data.

40
Q

Observable vs. Flowable

A

In the previous version of RxJava, there was only one base class for dealing with backpressure-aware and non-backpressure-aware sources – Observable.

RxJava 2 introduced a clear distinction between these two kinds of sources – backpressure-aware sources are now represented using a dedicated class – Flowable.

Observable sources don’t support backpressure. Because of that, we should use it for sources that we merely consume and can’t influence.

Also, if we’re dealing with a big number of elements, two possible scenarios connected with backpressure can occur depending on the type of the Observable.

In case of using a so-called “cold Observable“, events are emitted lazily, so we’re safe from overflowing an observer.

When using a “hot Observable” however, this will continue to emit events, even if the consumer can’t keep up.

41
Q

How to Creating a Flowable in RxJava?

A

Flowables can be created in a similar way to Observables. Here are a few common methods:

  1. Simple Flowable :
    We can create a Flowable using the just() method similarly as we could with Observable :

Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);</Integer>

Even though using the just() is quite simple, it isn’t very common to create a Flowable from static data, and it’s used for testing purposes.

  1. Flowable from Observable: When we have an Observable we can easily transform it to Flowable using the toFlowable() method:

Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
.toFlowable(BackpressureStrategy.BUFFER);</Integer></Integer>

Notice that to be able to perform the conversion, we need to enrich the Observable with a BackpressureStrategy.
  1. Flowable from FlowableOnSubscribe:

RxJava 2 introduced a functional interface FlowableOnSubscribe, which represents a Flowable that starts emitting events after the consumer subscribes to it.

Due to that, all clients will receive the same set of events, which makes FlowableOnSubscribe backpressure-safe.

When we have the FlowableOnSubscribe we can use it to create the Flowable:

FlowableOnSubscribe<Integer> flowableOnSubscribe
= flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);</Integer></Integer>

  1. Flowable BackpressureStrategy :

Some methods like toFlowable() or create() take a BackpressureStrategy as an argument. The BackpressureStrategy is an enumeration, which defines the backpressure behavior that we’ll apply to our Flowable.

It can cache or drop events or not implement any behavior at all, in the last case, we will be responsible for defining it, using backpressure operators.

There are five different strategies available in RxJava 2:

  1. Buffer : If we use the BackpressureStrategy.BUFFER, the source will buffer all the events until the subscriber can consume them.
  2. Drop : We can use the BackpressureStrategy.DROP to discard the events that cannot be consumed instead of buffering them.
  3. Latest : Using the BackpressureStrategy.LATEST will force the source to keep only the latest events, thus overwriting any previous values if the consumer can’t keep up.
  4. Error : When we’re using the BackpressureStrategy.ERROR, we’re simply saying that we don’t expect backpressure to occur. Consequently, a MissingBackpressureException should be thrown if the consumer can’t keep up with the source.
  5. Missing : If we use the BackpressureStrategy.MISSING, the source will push elements without discarding or buffering.