Kotlin Flows and Channels
In modern software development, asynchronous programming has become a crucial aspect of building responsive and efficient applications. Kotlin Flows are a powerful addition to the Kotlin language, designed to handle asynchronous stream processing in a more intuitive and structured way.
Flows act as a pipeline through which data can flow asynchronously, enabling reactive-style programming.
In a reactive-style system, data streams are treated as first-class citizens. Components within the system can subscribe to these streams to receive data updates and react accordingly.
In Android applications development we use flow where we got the data continuously in form of "Streams". There are some scenarios in which data can be got in the form of Streams:
Kotlin mainly deals with two topics to Deal with Streams
In Kotlin's Flow the concept of Producers and Consumers are key to understanding data streaming and how flow handles asynchronous data.
Producers in Flow: A producer in flow is responsible for emitting data items into stream.
it defines how and when data is generated, and sent downstream for consumption,
Consumers in Flow: A consumer is responsible for collecting and receiving data emitted by producers. Consumers actively observe handle the received values from producers.
Channels are Hot and Flows are most Cold. In the case of Hot producers are continuously produce the data whether any consumer is consuming it or not When a consumer subscribes to a hot stream, it will only receive the values emitted from that point forward. Hot streams typically represent data sources that don’t stop or reset for each subscriber. but in the case of Cold producer will not produce data till it will not have any consumer. This means that each time a consumer starts collecting from the stream, the stream begins emitting values independently for that specific consumer.
send(): Send function used to send data into a channel from a coroutine. This enables one coroutine to communicate with another by passing values through a channel. The send function suspends the coroutine until the value is delivered, making it a powerful tool for asynchronous programming.
receive(): Receive is used to get values from a channel asynchronously. When working with channels, receive allows one coroutine to retrieve data that another coroutine has sent into the channel, enabling inter-coroutine communication.
Basic Steps of Code to Create Flow
We can create a flow using flow{...} builder, and inside this we can emit values using emit()
functions
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun producer() = flow<Int> {
val list = listOf<Int>(1,2,3,4,5)
list.forEach{
delay(1000)
emit(it) //Emit the value
}
}
we can collect the value in consumer using the collect() function
GlobalScope.launch {
val data: Flow<Int> = producer().buffer(3)
data.map {
it * 2
}.filter { it < 8 }.collect {
println("From Collector-1 $it")
}
Output:
From Collector-1 4
From Collector-1 6
Flow operators:
Flow operators are categorized like following:
Non Terminal Operators(Code is in above example):
Terminal Operators:
Terminal operator in flow use to start our flow. These operators are final step in flow pipeline. these terminal operators are suspend functions. Without a terminal operator, the flow remains cold and does not execute any of its intermediate operations or event.
Here are some commonly used terminal operator in Flow:
(1) collect : The primary terminal operator used to collect values emitted by the flow.
flowOf(1, 2, 3).collect { value ->
println(value) // Outputs: 1, 2, 3
}
(2) toList : Use to collects all values emitted by the flow into a list.
val list = flowOf(1, 2, 3).toList()
println(list) // Outputs: [1, 2, 3]
(3) toSet: Use to collect all value into set removing duplicates.
val set = flowOf(1, 2, 2, 3).toSet()
println(set) // Outputs: [1, 2, 3]
(4)first: Use to Collects the first value emitted by the flow.
val firstValue = flowOf(1, 2, 3).first()
println(firstValue) // Outputs: 1
(5) last: Collects and returns the last value emitted by the flow.
val lastValue = flowOf(1, 2, 3).last()
println(lastValue) // Outputs: 3
(6) single: Collects and returns the single value emitted by the flow. Throws an exception if the flow emits more than one value or none at all.
val singleValue = flowOf(42).single()
println(singleValue) // Outputs: 42
(7) reduce: Reduces the values emitted by the flow using an accumulator.
val sum = flowOf(1, 2, 3).reduce { accumulator, value ->
accumulator + value
}
println(sum) // Outputs: 6
(8) fold:Similar to reduce, but starts with an initial value.
val sum = flowOf(1, 2, 3).fold(10) { accumulator, value ->
accumulator + value
}
println(sum) // Outputs: 16
Kotlin Flow Events
In Kotlin's flow events refers to signal or notifications that indicates the lifecycle of flow during its collection. these events helps to trach the flow behavior in different scenario like flow start, data emit, flow complete or on error.
There are several type of Flow Events and they may be categorized as below:
GlobalScope.launch {
val data: Flow<Int> = producr().onStart
{
emit(-1) //we can insert emit value from here also
println("start")
}.
onCompletion {
println("complete")
}.
onEach {
println("on each")
}
data.collect
{
println("Data Consumer-1 $it")
}
}
Flow Context Preservation:
Flow collector collect data on same Coroutine context on which we call emit method in producer. Flow assumes that the data will be collected will be in same Coroutine context or thread in which it emit. In Flow this scenario known as context Preservation because floq preserve the context. Some times In android frame work we need to emit and collect the data in different Coroutine context for example when we do network call we use IO thread and at time to update the UI we use Main thread. In Kotlin Flow we use flowOn() operator for this purpose and its called context switching. we can call the thread name as argument inside flowOn function so that producer will work on that thread.
Syntax and Example:
val flow = flow {
for (i in 1..3) {
println("Emitting $i on ${Thread.currentThread().name}")
emit(i)
}
}.flowOn(Dispatchers.Default) // Upstream operations will run on Dispatchers.Default
flow.collect { value ->
println("Collected $value on ${Thread.currentThread().name}")
}
Output:
Emitting 1 on DefaultDispatcher-worker-1
Emitting 2 on DefaultDispatcher-worker-1
Emitting 3 on DefaultDispatcher-worker-1
Collected 1 on main
Collected 2 on main
Collected 3 on main
Kotlin Shared Flow:
Shared flow is Hot Flow introduced in Kotlin Coroutines to facilitate the streams of events to multiple subscribers. It is useful when you want to broadcast values to multiple consumers by ensuring that all of them will receive the same updates.
Creating a Shared Flow:
We can create a shared flow using the MutableSharedFlow class.
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.launch
fun main() = runBlocking {
// Create a MutableSharedFlow
val sharedFlow = MutableSharedFlow<Int>()
// Collector 1
launch {
sharedFlow.collect { value ->
println("Collector 1 received: $value")
}
}
// Collector 2
launch {
sharedFlow.collect { value ->
println("Collector 2 received: $value")
}
}
// Emit values
sharedFlow.emit(1)
sharedFlow.emit(2)
}
Output:
Collector 1 received: 1
Collector 2 received: 1
Collector 1 received: 2
Collector 2 received: 2
Replay and Extra Buffering
If a consumer consumes items some late we can create a replay of previous items.
val sharedFlow = MutableSharedFlow<Int>(replay = 2) // Replays the last 2 values
Kotlin State Flow :
State flow in Kotlin is a Hot Flow provided by Kotlin Coroutines Library, designed to hold and mange a state in a reactive programming model. It serves as a Kotlin friendly alternative of Live Data, for managing an observing state in android applications.
State Flow is also like a Hot flow , there are multiple consumers in state flow and it preserve the last or latest value of flow so it maintain a state. It is like a single value holder buffer, so that all the consumers will get same value whether it is joined late.
Creating State Flow:
To create and mange State Flow, you can use its mutable counterpart: MutableStateFlow.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Create a MutableStateFlow with an initial value
val stateFlow = MutableStateFlow("Initial State")
// Launch a coroutine to collect the state flow
launch {
delay(4000)
stateFlow.collect { value ->
println("Collector received: $value")
}
}
// Emit new values to update the state
stateFlow.value = "Updated State 1" //we can also use emit here
delay(100)
stateFlow.value = "Updated State 2"
}
Collector received: Updated State 2
Note: Because we have added a delay of 6000ms so we will only received the last value because state Flow preserve the last value.
State Flow vs Live Data:
Channels
In Kotlin Channels are also a part of Kotlin Coroutine Library and are used for communication between coroutine.
A channel is like a pipe one coroutine send the data into channel using send() and other coroutine receive the data using receive(). Channels work like a queue, where you can put data in and take it out one by one. However, each event sent through a Channel is consumed by a single subscriber, unlike Flows where each emissions are shared between all collectors.
Basic Steps of Code to Create a Channel
val channel = Channel<String>() //Create a channel Object that hold a String value
val producer = launch {
val messages = listOf("Hello", "World", "From", "Kotlin")
for (message in messages) {
println("Sending: $message")
channel.send(message) // Sends each message to the channel
}
channel.close() // Closes the channel after sending all messages
}
val consumer = launch {
for (message in channel) { // Receives messages from the channel
println("Received: $message")
}
println("Channel is closed, no more messages to receive.")
}
// Wait for producer and consumer to finish
producer.join()
consumer.join()
Channels vs Shared Flow:
Channels:
Purpose :
Nature:
Producers:
Consumers:
Shared Flow:
Purpose :
Nature:
Producers:
Consumers:
Note:
When multiple consumers are actively consuming data from a channel, the channel’s behavior is such that it takes turns delivering data to each consumer in a sequential manner. This ensures that no consumer hogs all the data and that all consumers have the opportunity to process the data they receive. This fair distribution mechanism is particularly useful in scenarios where you have multiple consumers working concurrently, and you want to prevent any one consumer from monopolizing the data processing. It promotes balanced resource utilization and prevents potential bottlenecks in your application. Flows come with built-in backpressure handling, which helps you manage data when it’s coming in too fast. Channels don’t have this built-in feature, so you’ll need to handle backpressure manually when using Channels.
Conclusion:
Kotlin Flows are a cornerstone of modern asynchronous programming, enabling developers to handle complex operations seamlessly and build more responsive, performant applications. By mastering Kotlin Flows, you unlock the ability to create robust, reactive solutions tailored to your application's needs. This concludes our exploration of Kotlin Flows now it's your turn to put them into action and take your asynchronous programming to the next level!
PHP developer
2moNice Explanation...Keep it up Arpan