RxJava Flashcards
What’s RxJava?
RxJava is a reactive programming library for composing asynchronous and event-based programs by using observable sequences.
Reactive programming is based on data streams and the propagation of change. With reactive programming, you can express static (such as arrays), or dynamic (such as event emitters) data streams with ease.
Where can you use RxJava?
There is a multitude of places you can use RxJava, and below are the most common places where you can implement it:
- Network Calls (such as API calls through HTTP with Retrofit, which fully supports RxJava);
- UI events that should trigger actions;
- Database Read and Write and/or files in the system;
- Data coming out of the sensors;
- Etc
What are the main elements of RxJava?
- Observable
- Operator
- Observer
Describe Observable
An Observable is where the data stream comes from, it does some work and emits values.
You can think about that like it was a Speaker
Describe Operator
An Operator has the capability to modify the data from one form to another.
You can think about that like it was a Translator
Describe Observer
An Observer receives the values.
You can think about that like it was a Listener
What does it need to use RxJava in Android project?
You need to add dependencies in app level Build.gradle:
implementation “io.reactivex.rxjava2:rxjava:2.2.7”
implementation “io.reactivex.rxjava2:rxandroid:2.1.1”
Diffrent ways of creating Observable
- just - The just operator converts an Item into an Observable and emits it
- from* - For example fromArray, fromIterable
- create - This way you can create an Observable from the ground up
- interval - This function will create an infinite sequence of ticks, separated by the specified duration.
What is Backpressure?
Backpressure is the process of handling an emitter that will produce a lot of items very fast. Let’s say an Observable produces 100000 items per second, how will a subscriber that can only handle 100 items per second process those items?
The Observable class has an unbounded buffer size, it buffers everything and pushed onto the subscriber, and if it’s emitting more than it can handle, you’re bound to get OutOfMemoryException .
We can handle such a stream of data if we apply Backpressure to the items as needed, in this way it the unnecessary items will be discarded or even let the producer know when to create or when to push the newly emitted item.
How can you solve Backpressure problem?
The solution is easy. Instead of an Observable you can use a Flowable which will handle the Backpressure for you since it takes it into consideration whereas Observable do not.
Emitter types in RxJava
- Observable
- Flowable
- Maybe
- Single
- Completable
Flowable emitter type
It works exactly like an Observable but it supports Backpressure.
Maybe emitter type
This class is used when you’d like to return a single optional value. The methods are mutually exclusive, in other words, only one of them is called. If there is an emitted value, it calls onSuccess , if there’s no value, it calls onComplete or if there’s an error, it calls onError .
Single emitter type
It’s used when there’s a single value to be returned. If we use this class and there is a value emitted, onSuccess will be called. If there’s no value, onError will be called.
Completable emitter type
A completable won’t emit any data, what it does is let you know whether the operation was successfully completed. If it was, it calls onComplete and if it wasn’t it calls onError . A common use case of completable is for REST APIs, where successful access will return HTTP 204 , and errors can ranger from HTTP 301 , HTTP 404 , HTTP 500 , etc. We might do something with the information.
Can you manually call the methods doOnSubscribe, doOnNext, etc?
You can also manually call the methods
doOnSubscribe, doOnNext, doOnError, doOnComplete.
Observable.just(“Hello”)
.doOnSubscribe { println(“Subscribed”) }
.doOnNext { s -> println(“Received: $s”) }
.doAfterNext { println(“After Receiving”) }
.doOnError { e -> println(“Error: $e”) }
.doOnComplete { println(“Complete”) }
.doFinally { println(“Do Finally!”) }
.doOnDispose { println(“Do on Dispose!”) }
.subscribe { println(“Subscribe”) }
Is RxJava by default multithreaded?
Although RxJava is heavily marketed as an asynchronous way of doing reactive programming, it’s important to clarify that RxJava is single threaded by default, and you need to specify otherwise, and that’s where Schedulers come in.
Define Synchronous programming
With Synchronous programming, only one thing happens at a time. The code fires up method a, which Reads/Write from the database, and waits for a to finish before moving on to b. So you get one thing happening at a time, and it’s the most common cause for UI freeze since the code will also run in the same thread as the UI.
Define Asynchronous programming
With Asynchronous programming, you can call many methods at once, without waiting for another to finish. It’s one of the most fundamentals aspects of Android Development, you do not want to run every code on the same thread as the UI, especially computational code.
SubscribeOn method, what it does?
With subscribeOn you get to decide which thread your Emitter (such as Observable , Flowable , Single , etc) is executed. The subscribeOn (as well as the observeOn ) needs the Scheduler param to know which thread to run on.
What is the most common type of Scheduler?
Scheduler.io() This is the most common types of Scheduler that are used. They’re generally used for IO related stuff, such as network requests, file system operations, and it’s backed by a thread pool. A Java Thread Pool represents a group of worker threads that are waiting for the job and reuse many times.
Types of Schedulers
- Scheduler.io()
- Scheduler.computation()
- Scheduler.newThread()
- Scheduler.single()
- Scheduler.trampoline()
- Executor Scheduler
- AndroidSchedulers.mainThread()
observeOn method, define how it work
The method subscribeOn() will instruct the source Observable which thread to emit the items on and push the emissions on our Observer . But if it finds an observeOn() in the chain, it switches the emissions using the selected scheduler for the remaining operation.
Transformer in RxJava, how it works
With a transformer, we can avoid repeating some code by applying the most commonly used chains among your Observable. For example:
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Observable.just("Apple", "Orange", "Banana") .compose(applyObservableAsync()) .subscribe { v -> println("The First Observable Received: $v") } Observable.just("Water", "Fire", "Wood") .compose(applyObservableAsync()) .subscribe { v -> println("The Second Observable Received: $v") }
}
fun applyObservableAsync(): ObservableTransformer {
return ObservableTransformer { observable ->
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}