Go with the Kotlin Flow😀

What’s special about Kotlin Flow 🤔

Hot flows send or emit data even if no one is consuming them.
Cold flows emit data only if they have an active consumer.

Why Flow? 😳

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values?

Here we can use flow on top of the Coroutine. For example, we can use a flow to receive live updates from a database. The flow can safely make a network request to produce the next value without blocking the main thread

There are three entities available for Data streaming.

android_dev
fun printNumber(): Flow<Int> = flow {
println("Flow Started")
for (i in 1..5) {
delay(100)
emit(i)
}
}

2. Modifying the stream (Operators)

There are two types of operators

They are functions that are applied to the upstream flow and return to a downstream flow, where we can apply other operators.

Intermediaries can use intermediate operators to modify the stream of data without consuming the values.

Some operators don’t return any data, instead, they just add some feature to the flow. Example — Cancellable

The intermediate operators are not suspendable functions, but they can work with suspendable functions inside

we have some operators which are used to modify the emitted value-
map()
filter()
reduce()
fold() etc

These operators are suspendable functions and their general purpose is to collect the values received from the upstream flow. Because of suspending function, it needed to call from the coroutine.

As the name mentions, they are terminal because you cannot apply another operator (of any type) if any of them are applied

toCollection(destination: C)
first() and last()
single()..etc

The Flow collection can stop only in two cases —

Example of Flow Stream ✍🏻

Flow Is Cold Stream. By default, flows are sequential, and all flow operations are executed sequentially in the same coroutine, except for a few operations specifically designed to introduce concurrency into flow executions such as buffer and flatMapMerge. The Code inside a flow builder does not run until the flow is collected.

fun main() = runBlocking {

println("runBlocking scope: $this")
val flow = printNumber()
println("calling collect funtion...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}

fun
printNumber(): Flow<Int> = flow {
println("Flow Started")
for (i in 1..5) {
delay(100)
emit(i)
}
}

Flow builders 🤹🏻‍♀️

There are the following basic ways to create a flow:

What is Sequential and Concurrency Flow?

Take an example of a flow that emits ten integers with 100 ms between them

val printNumber: Flow<Int> = flow {
for
(i in 1..10) {
delay(100)
emit(i)
}
}

and collects the value with a delay of 100ms

val time = measureTimeMillis {
printNumber.collect {
delay(100)
println(it)
}
}
println("Collected in $time ms")

it takes around 2 seconds to complete because both emitter and collector are parts of sequential executions.

Concurrent Coroutine

in the concurrent coroutine, we can structure the execution, so the operation can run faster. We need to create a way where the emitter should be a separate coroutine from the collector, and they can be concurrent.

Now here is the problem…

With the two separate coroutines, we cannot directly emit the value by a function call. We need to establish some communication between the emitter and the collector. To create this communication, Channels are designed to do.

We can send the elements via a channel from one coroutine and receive them in another one.

Emitter:

coroutineScope {
val channel = produce(capacity = size) {
collect { send(it) }
}
channel.consumeEach { emit(it) }
}
}

Running the same emitter and collector code and adding buffer() operator in between to make the execution time faster.

Collector:

val time = measureTimeMillis {
ints.buffer().collect {
delay(100)
println(it)
}
}
println("Collected in $time ms")

Flow Properties💁🏻‍♀️

All flow implementation must have two properties

1. Context

Some flows allow more than one coroutine (it means, more than one Coroutine Scope). However, if we use Kotlin Flow Builders, they guarantee context preservation.

In suspending functions, we could use withContext() to change the Dispatcher, but code inside builders is not allowed to emit from a different context (thus breaking context preservation) and attempting to use withContext() will throw a runtime Exception.

To change the context, we can use the flowOn() operator. The flowOn operator only changes the coroutine context in the builder and operators applied
upstream of it.

2. Exception transparency

Flow Exception handling will not work with the Try/Catch block.

Flow collection can complete with an exception. For exception handling, we must use the catch operator, that’s designed to catch exceptions coming from the upstream flow. There are other operators to handle exceptions, in flow, like retry.

The catch operator catches only upstream exceptions.

To be continued…

--

--

I am an Android Developer. Working for BMW group. In my development path if I find something interesting I like to share with you guys. Happy coding…

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Prachi Mishra

I am an Android Developer. Working for BMW group. In my development path if I find something interesting I like to share with you guys. Happy coding…