Go with the Kotlin Flow😀

Prachi Mishra
5 min readSep 4, 2022

--

What’s special about Kotlin Flow 🤔

  • An asynchronous data stream that sequentially emits values and completes normally or with an exception.
  • Kotlin Flow is an Implementation of Reactive programming which can use to model a stream of Data.
  • Flows are built on top of the coroutine and can provide multiple values.
  • The emitted value must be the same type.
  • Flows can be hot and cold, by default kotlin flows are cold.
  • Interoperability between other reactive streams and coroutines
  • Supports Kotlin multiplatform
  • Fewer and simple operators
  • structured concurrency
  • Null safety in streams

Hot flows send or emit data even if no one is consuming them.
Cold flows emit data only if they have an active consumer.

Why Flow? 😳

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values?

Here we can use flow on top of the Coroutine. For example, we can use a flow to receive live updates from a database. The flow can safely make a network request to produce the next value without blocking the main thread

There are three entities available for Data streaming.

android_dev
  • Producer — produces data that is added to the stream. Thanks to coroutines, flows can also produce data asynchronously.
  • (Optional) Intermediaries — can modify each value emitted into the stream or the stream itself.
  • Consumer — consumes the values from the stream.
  1. Flow Creator (Producer)
fun printNumber(): Flow<Int> = flow {
println("Flow Started")
for (i in 1..5) {
delay(100)
emit(i)
}
}

2. Modifying the stream (Operators)

There are two types of operators

  • Intermediate Operators

They are functions that are applied to the upstream flow and return to a downstream flow, where we can apply other operators.

Intermediaries can use intermediate operators to modify the stream of data without consuming the values.

Some operators don’t return any data, instead, they just add some feature to the flow. Example — Cancellable

The intermediate operators are not suspendable functions, but they can work with suspendable functions inside

we have some operators which are used to modify the emitted value-
map()
filter()
reduce()
fold() etc

  • Terminal operators

These operators are suspendable functions and their general purpose is to collect the values received from the upstream flow. Because of suspending function, it needed to call from the coroutine.

As the name mentions, they are terminal because you cannot apply another operator (of any type) if any of them are applied

toCollection(destination: C)
first() and last()
single()..etc

The Flow collection can stop only in two cases —

  • The producer finishes emitting items. In this case, the stream of data is closed and the coroutine that is called collect resumes execution.
  • The coroutine that collects is cancelled, This also stops the underlying producer.

Example of Flow Stream ✍🏻

Flow Is Cold Stream. By default, flows are sequential, and all flow operations are executed sequentially in the same coroutine, except for a few operations specifically designed to introduce concurrency into flow executions such as buffer and flatMapMerge. The Code inside a flow builder does not run until the flow is collected.

fun main() = runBlocking {

println("runBlocking scope: $this")
val flow = printNumber()
println("calling collect funtion...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}

fun
printNumber(): Flow<Int> = flow {
println("Flow Started")
for (i in 1..5) {
delay(100)
emit(i)
}
}

Flow builders 🤹🏻‍♀️

There are the following basic ways to create a flow:

  • flowOf(…) functions to create a flow from a fixed set of values.
  • asFlow() extension functions on various types to convert them into flows.
  • flow { … } builder function to construct arbitrary flows from sequential calls to emit function (above example).
  • channelFlow { … } builder function to construct arbitrary flows from potentially concurrent calls to the send function.
  • MutableStateFlow and MutableSharedFlow define the corresponding constructor functions to create a hot flow that can be directly updated.

What is Sequential and Concurrency Flow?

Take an example of a flow that emits ten integers with 100 ms between them

val printNumber: Flow<Int> = flow {
for
(i in 1..10) {
delay(100)
emit(i)
}
}

and collects the value with a delay of 100ms

val time = measureTimeMillis {
printNumber.collect {
delay(100)
println(it)
}
}
println("Collected in $time ms")

it takes around 2 seconds to complete because both emitter and collector are parts of sequential executions.

Concurrent Coroutine

in the concurrent coroutine, we can structure the execution, so the operation can run faster. We need to create a way where the emitter should be a separate coroutine from the collector, and they can be concurrent.

Now here is the problem…

With the two separate coroutines, we cannot directly emit the value by a function call. We need to establish some communication between the emitter and the collector. To create this communication, Channels are designed to do.

We can send the elements via a channel from one coroutine and receive them in another one.

Emitter:

coroutineScope {
val channel = produce(capacity = size) {
collect { send(it) }
}
channel.consumeEach { emit(it) }
}
}

Running the same emitter and collector code and adding buffer() operator in between to make the execution time faster.

Collector:

val time = measureTimeMillis {
ints.buffer().collect {
delay(100)
println(it)
}
}
println("Collected in $time ms")

Flow Properties💁🏻‍♀️

All flow implementation must have two properties

1. Context

  • The context where the flow is emitting values is never leaked to the downstream flow receiver.
  • The values are produced in only one coroutine scope.

Some flows allow more than one coroutine (it means, more than one Coroutine Scope). However, if we use Kotlin Flow Builders, they guarantee context preservation.

In suspending functions, we could use withContext() to change the Dispatcher, but code inside builders is not allowed to emit from a different context (thus breaking context preservation) and attempting to use withContext() will throw a runtime Exception.

To change the context, we can use the flowOn() operator. The flowOn operator only changes the coroutine context in the builder and operators applied
upstream of it.

2. Exception transparency

Flow Exception handling will not work with the Try/Catch block.

Flow collection can complete with an exception. For exception handling, we must use the catch operator, that’s designed to catch exceptions coming from the upstream flow. There are other operators to handle exceptions, in flow, like retry.

The catch operator catches only upstream exceptions.

To be continued…

--

--

Prachi Mishra
Prachi Mishra

Written by Prachi Mishra

I am working As an Android Dev. Currently, developing an App for my Dream car (BMW) .In my Dev path If I find something interesting I like to share it with you.

No responses yet