Flow and Channels Flashcards
What is Flow
Flow is a stream of values that are computed asynchronously. Flow is asynchronous data stream. It’s part of Kotlin coroutine library.
Flows are “cold” streams, meaning that the code inside a flow builder does not run until the flow is collected. This is in contrast to “hot” streams (like channels in Kotlin), where data can be produced regardless of whether there are collectors.
Unlike a regular collection (return values at once) or sequence (thread blocking) flow emits values continuously and asynchronously without blocking the thread on which values are consumed.
Base Flow Builders
flowOf() - create a Flow from fixed set of values: flowOf(1,2,3)
.asFlow() - extension on various types to convert them int Flows: listOf(1,2,3).asFlow()
flow {} - builder function to construct arbitrary flows: ```
flow {
emit(1)
emit(2)
emit(3)
}
~~~
Base Terminal operators
collect() - Collects the emitted values from the flow. This is the most basic form of terminal operation, where you can process each value as it is emitted.
toList() and toSet() - These operators collect the emitted values into a List or Set, respectively, and return it.
first() - Returns the first value emitted by the flow and then cancels the flow’s collection.
last() - Returns the last value emitted by the flow before it completes
single(): Expects the flow to emit a single value and return one value. It throws an exception if the flow emits more than one value.
count(): Counts the number of items emitted by the flow.
take(n): Takes the first n values from the flow and then cancels the collection.
LaunchIn
launchIn is not a suspend function, and therefore it can be called outside of a coroutine.
~~~
val scope = CoroutineScope(EmptyCoroutineContext)
flow
.onEach{
println(it)
}
.launchIn(scope)
~~~
это аналог
~~~
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch{
flow.collect {
println(it)
}
}
~~~
Lifecycle operators
onStart{}
onCompleted{cause} if cause == null thencompleted without exceptions
Intermediate operators
map{} - modify the emission or transform one type into another.
filter{} - filter the emission based on specific criteria.
transform{} - similar to map, but can emit multiple values. Example: transform { emit(it); emit(it*10) }
withIndex{} - returns IndexValue(index, value).
distinctUntilChanged - if the next element is the same as the previous one, it will not be emitted.
Example: flowOf(1,1,2,3,1).distinctUntilChanged() -> 1,2,3,1
Exception Transparency
Exception Transparency — rules:
1. a downstream exception must always be propagated to the collector. Or downstream exception shouldn’t becaught by upstream operators
2. If exception was passed to downstream then Flow should stop the emission
Exception Transparency vialotion example
~~~
flow {
try {
emit(1)
} catch (e: Exception) {
println(“Catch exception in flow builder.”)
}
}.collect { emittedValue ->
println(emittedValue)
throw Exception(“Exception in collect{}”)
}
~~~
this is equals to
~~~
val inlinedFlow = flow<Int> {
try {
println(1)
throw Exception("Exception in collect{}")
} catch (e: Exception) {
println("Catch exception in flow builder.")
}
}
~~~</Int>
Exception Handling
it’s possible to use try catch but better to use catch{} operator:
flow { emitData() } .map { computeOne(it) } .catch { emitAll(newValues) } // catches exceptions in emitData and computeOne .catch { ... } // catches exceptions in new flow from previous catch block .map { computeTwo(it) } .collect { process(it) } // throws exceptions from process and computeTwo
Shared and State Flow pros an cons over Livedata
Pros:
- A Single type of observable data holder throughout your architecture
- No knowledge about LiveData necessary
- More flow operators
- ViewModels are decoupled from Android Dependencies
- Simplified testing
Cons
- Boilerplate code
- Complexity. Easy make mistake and flow will emit data even if app is in the backgroud
Cold Flow vs Hot Flow
Cold Flow
- become active on collection
- become inactive on cancellation of the collecting coroutine
- emit individual emissions to every collector. Multiple collectors create multiple flows
Hot Flow
- are active regardless of whether there are collectors
- stay active even when there is no more collector
- emissions are shared between all collectors. Multiple collectors share one flow
Shared Flow vs State Flow
Shared Flow vs State Flow
Initial Value: No
Replay Cache: customizable
Emission of subsequent equal values: yes
State Flow
Initial Value: yes
Replay Cache: 1
Emission of subsequent equal values: no
Channel
A channel is a primitive, a mechanism to organize a queue for message exchange between coroutines. Simply put, it’s like a pipe through which a sender can send messages and a receiver can receive them. A single channel can have multiple senders and multiple receivers.
Interface Channel<E>: ReceiveChannel<E>, SendChannel<E></E></E></E>
Produce builder
Create a channel that can send messages
~~~
val channel: ReceiveChannel<String> =
scope.produce {
while (true) {
send("sending...")
}
}
~~~</String>
Actor builder
Create a channel that can receive messages
~~~
val channel: SendChannel<String> =
scope.actor {
val data: String = receive()
println(data)
}
~~~</String>
Close channel
A channel operates until the close method is called. You need to handle potential errors using try-catch blocks, which can occur when calling the send or receive methods.
~~~
try {
channel.send(1)
} catch(e: ClosedSendCHannelException) {
// обрабатываем ошибку в случае закрытия канала
}
~~~