Kotlin Flows and Channels

Kotlin Flows and Channels

In modern software development, asynchronous programming has become a crucial aspect of building responsive and efficient applications. Kotlin Flows are a powerful addition to the Kotlin language, designed to handle asynchronous stream processing in a more intuitive and structured way.

Flows act as a pipeline through which data can flow asynchronously, enabling reactive-style programming.

In a reactive-style system, data streams are treated as first-class citizens. Components within the system can subscribe to these streams to receive data updates and react accordingly.

In Android applications development we use flow where we got the data continuously in form of "Streams". There are some scenarios in which data can be got in the form of Streams:

  • Stocks market Data
  • GPS Data
  • Video Streaming
  • Bluetooth Speaker Audio from Mobile


Flow Data Streams

Kotlin mainly deals with two topics to Deal with Streams

  • Flows (Emit and Collect)
  • Channels (Send and Receive)

In Kotlin's Flow the concept of Producers and Consumers are key to understanding data streaming and how flow handles asynchronous data.

Producers in Flow: A producer in flow is responsible for emitting data items into stream.

it defines how and when data is generated, and sent downstream for consumption,

Consumers in Flow: A consumer is responsible for collecting and receiving data emitted by producers. Consumers actively observe handle the received values from producers.

Channels are Hot and Flows are most Cold. In the case of Hot producers are continuously produce the data whether any consumer is consuming it or not When a consumer subscribes to a hot stream, it will only receive the values emitted from that point forward. Hot streams typically represent data sources that don’t stop or reset for each subscriber. but in the case of Cold producer will not produce data till it will not have any consumer. This means that each time a consumer starts collecting from the stream, the stream begins emitting values independently for that specific consumer.

send(): Send function used to send data into a channel from a coroutine. This enables one coroutine to communicate with another by passing values through a channel. The send function suspends the coroutine until the value is delivered, making it a powerful tool for asynchronous programming.

receive(): Receive is used to get values from a channel asynchronously. When working with channels, receive allows one coroutine to retrieve data that another coroutine has sent into the channel, enabling inter-coroutine communication.

Basic Steps of Code to Create Flow

We can create a flow using flow{...} builder, and inside this we can emit values using emit()

functions

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun producer() = flow<Int> {

        val list = listOf<Int>(1,2,3,4,5)
        list.forEach{
            delay(1000)
            emit(it) //Emit the value

        }
    }        

we can collect the value in consumer using the collect() function

GlobalScope.launch {
    val data: Flow<Int> = producer().buffer(3)

    data.map {
        it * 2
    }.filter { it < 8 }.collect {
        println("From Collector-1 $it")
    }        

Output:

From Collector-1 4
From Collector-1 6        

Flow operators:

Flow operators are categorized like following:

Non Terminal Operators(Code is in above example):

  • buffer() : Buffer is used by the consumer to buffer some item if it is taking more time to consume the item and producer is producing fast. This make processing fast.
  • map(): The map( ) operator in Kotlin's Flow used to transform the data emitted by the flow into another form. it applies a given transformation function to each value and return a new flow containing a transformed value.
  • filter(): The filter operator in Kotlin flow used to emit only the values from a flow that satisfy a given condition. it acts like a filter in collections, allowing that which data will pass through the flow bases on condition.

Terminal Operators:

Terminal operator in flow use to start our flow. These operators are final step in flow pipeline. these terminal operators are suspend functions. Without a terminal operator, the flow remains cold and does not execute any of its intermediate operations or event.

Here are some commonly used terminal operator in Flow:

(1) collect : The primary terminal operator used to collect values emitted by the flow.

flowOf(1, 2, 3).collect { value ->
    println(value) // Outputs: 1, 2, 3
}        

(2) toList : Use to collects all values emitted by the flow into a list.

val list = flowOf(1, 2, 3).toList()
println(list) // Outputs: [1, 2, 3]        

(3) toSet: Use to collect all value into set removing duplicates.

val set = flowOf(1, 2, 2, 3).toSet()
println(set) // Outputs: [1, 2, 3]        

(4)first: Use to Collects the first value emitted by the flow.

val firstValue = flowOf(1, 2, 3).first()
println(firstValue) // Outputs: 1        

(5) last: Collects and returns the last value emitted by the flow.

val lastValue = flowOf(1, 2, 3).last()
println(lastValue) // Outputs: 3        

(6) single: Collects and returns the single value emitted by the flow. Throws an exception if the flow emits more than one value or none at all.

val singleValue = flowOf(42).single()
println(singleValue) // Outputs: 42        

(7) reduce: Reduces the values emitted by the flow using an accumulator.

val sum = flowOf(1, 2, 3).reduce { accumulator, value ->
    accumulator + value
}
println(sum) // Outputs: 6        

(8) fold:Similar to reduce, but starts with an initial value.

val sum = flowOf(1, 2, 3).fold(10) { accumulator, value ->
    accumulator + value
}
println(sum) // Outputs: 16        

Kotlin Flow Events

In Kotlin's flow events refers to signal or notifications that indicates the lifecycle of flow during its collection. these events helps to trach the flow behavior in different scenario like flow start, data emit, flow complete or on error.

There are several type of Flow Events and they may be categorized as below:

  • onStart: Triggered when the flow collection start.
  • onEach: Triggered for every value in flow.
  • onCompletion: Triggered when flow complete whether normally or due to an exception.
  • catch: Triggered when flow encounters an exception

        GlobalScope.launch {
          val data: Flow<Int> = producr().onStart
 {
               emit(-1)  //we can insert emit  value from here  also
               println("start") 
}.
            onCompletion { 
              println("complete") 
}.
            onEach { 
             println("on each")
 }
           data.collect
{
          println("Data Consumer-1 $it")
          }
       }        

Flow Context Preservation:

Flow collector collect data on same Coroutine context on which we call emit method in producer. Flow assumes that the data will be collected will be in same Coroutine context or thread in which it emit. In Flow this scenario known as context Preservation because floq preserve the context. Some times In android frame work we need to emit and collect the data in different Coroutine context for example when we do network call we use IO thread and at time to update the UI we use Main thread. In Kotlin Flow we use flowOn() operator for this purpose and its called context switching. we can call the thread name as argument inside flowOn function so that producer will work on that thread.

Syntax and Example:

val flow = flow {
    for (i in 1..3) {
        println("Emitting $i on ${Thread.currentThread().name}")
        emit(i)
    }
}.flowOn(Dispatchers.Default) // Upstream operations will run on Dispatchers.Default

flow.collect { value ->
    println("Collected $value on ${Thread.currentThread().name}")
}        

Output:

Emitting 1 on DefaultDispatcher-worker-1
Emitting 2 on DefaultDispatcher-worker-1
Emitting 3 on DefaultDispatcher-worker-1
Collected 1 on main
Collected 2 on main
Collected 3 on main        

Kotlin Shared Flow:

Shared flow is Hot Flow introduced in Kotlin Coroutines to facilitate the streams of events to multiple subscribers. It is useful when you want to broadcast values to multiple consumers by ensuring that all of them will receive the same updates.

Creating a Shared Flow:

We can create a shared flow using the MutableSharedFlow class.

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.launch

fun main() = runBlocking {
    // Create a MutableSharedFlow
    val sharedFlow = MutableSharedFlow<Int>()

    // Collector 1
    launch {
        sharedFlow.collect { value ->
            println("Collector 1 received: $value")
        }
    }

    // Collector 2
    launch {
        sharedFlow.collect { value ->
            println("Collector 2 received: $value")
        }
    }

    // Emit values
    sharedFlow.emit(1)
    sharedFlow.emit(2)
}        

Output:

Collector 1 received: 1
Collector 2 received: 1
Collector 1 received: 2
Collector 2 received: 2        

Replay and Extra Buffering

If a consumer consumes items some late we can create a replay of previous items.

val sharedFlow = MutableSharedFlow<Int>(replay = 2) // Replays the last 2 values        

Kotlin State Flow :

State flow in Kotlin is a Hot Flow provided by Kotlin Coroutines Library, designed to hold and mange a state in a reactive programming model. It serves as a Kotlin friendly alternative of Live Data, for managing an observing state in android applications.

State Flow is also like a Hot flow , there are multiple consumers in state flow and it preserve the last or latest value of flow so it maintain a state. It is like a single value holder buffer, so that all the consumers will get same value whether it is joined late.

State Flow Example


Creating State Flow:

To create and mange State Flow, you can use its mutable counterpart: MutableStateFlow.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // Create a MutableStateFlow with an initial value
    val stateFlow = MutableStateFlow("Initial State")


    // Launch a coroutine to collect the state flow
    launch {
       delay(4000)
        stateFlow.collect { value ->
            println("Collector received: $value")
        }
    }

    // Emit new values to update the state
    stateFlow.value = "Updated State 1" //we can also use emit here
    delay(100)
    stateFlow.value = "Updated State 2"
}
        
Collector received: Updated State 2        

Note: Because we have added a delay of 6000ms so we will only received the last value because state Flow preserve the last value.

State Flow vs Live Data:

  • Live Data all the operators like filter, map execute on Main Thread which degrade the performance but in state Flow we can change change the thread using flowOn.
  • Flow have more operators than Live Data.
  • Live Data is Life Cycle Dependent but state Flow we can use anywhere like in repository.
  • State Flow does not automatically manage lifecycle like Live Data does. While Live Data is lifecycle-aware and stops emitting updates when the associated lifecycle (e.g., an Activity or Fragment) is in a non-active state (like onPause or onStop), State Flow always remains active and continues emitting updates regardless of the lifecycle state of its collectors.
  • While StateFlow is not lifecycle-aware by default, you can integrate it into Android apps with lifecycle tools like repeatOnLifecycle and lifecycleScope to achieve similar behavior. This makes it a modern, flexible alternative to LiveData when using Kotlin Coroutines.

Channels

In Kotlin Channels are also a part of Kotlin Coroutine Library and are used for communication between coroutine.

A channel is like a pipe one coroutine send the data into channel using send() and other coroutine receive the data using receive(). Channels work like a queue, where you can put data in and take it out one by one. However, each event sent through a Channel is consumed by a single subscriber, unlike Flows where each emissions are shared between all collectors.

Basic Steps of Code to Create a Channel

  • Create a Channel of type String

 val channel = Channel<String>()  //Create a channel Object that hold a String value        

  • Make a producer Coroutine

 val producer = launch {
        val messages = listOf("Hello", "World", "From", "Kotlin")
        for (message in messages) {
            println("Sending: $message")
            channel.send(message) // Sends each message to the channel
        }
        channel.close() // Closes the channel after sending all messages
    }        

  • Make a Consumer Coroutine

val consumer = launch {
        for (message in channel) { // Receives messages from the channel
            println("Received: $message")
        }
        println("Channel is closed, no more messages to receive.")
    }        

  • Wait for Consumer and Producer to finish

// Wait for producer and consumer to finish
producer.join() 
consumer.join()        

Channels vs Shared Flow:

Channels:

Purpose :

  • Channels are mainly used for communication between coroutines.

Nature:

  • Hot and point to point communication.

Producers:

  • Channels Typically one producer.
  • One coroutine sends data, and another coroutine receives it.

Consumers:

  • Channels Typically have one consumer.
  • Once data is consumed by one receiver, it is no longer available.
  • No replay. Data is lost if not received.

Shared Flow:

Purpose :

  • Shared Flow share a stream of data with multiple consumers.

Nature:

  • Hot and multicast i.e. multiple collector can receive the same data.

Producers:

  • Shared Flow Can have multiple producers.

Consumers:

  • In Shared Flow multiple collectors can collect the Data.
  • Shared across multiple collectors.
  • Each collector receives its own copy of the emitted data.
  • Can replay a specified number of values to new collectors.

Note:

When multiple consumers are actively consuming data from a channel, the channel’s behavior is such that it takes turns delivering data to each consumer in a sequential manner. This ensures that no consumer hogs all the data and that all consumers have the opportunity to process the data they receive. This fair distribution mechanism is particularly useful in scenarios where you have multiple consumers working concurrently, and you want to prevent any one consumer from monopolizing the data processing. It promotes balanced resource utilization and prevents potential bottlenecks in your application. Flows come with built-in backpressure handling, which helps you manage data when it’s coming in too fast. Channels don’t have this built-in feature, so you’ll need to handle backpressure manually when using Channels.

Conclusion:

Kotlin Flows are a cornerstone of modern asynchronous programming, enabling developers to handle complex operations seamlessly and build more responsive, performant applications. By mastering Kotlin Flows, you unlock the ability to create robust, reactive solutions tailored to your application's needs. This concludes our exploration of Kotlin Flows now it's your turn to put them into action and take your asynchronous programming to the next level!

Nice Explanation...Keep it up Arpan

Like
Reply

To view or add a comment, sign in

More articles by Arpan Bhatia

  • Closures in Flutter

    Closures in Flutter

    A Closure is Function in Flutter that capture a value from its surroundings scope, even after scope exited. It is…

  • Higher Order Functions in Dart & Flutter:

    Higher Order Functions in Dart & Flutter:

    Higher Functions are the function that are used to make our code more concise, flexible and reusable. A higher order…

  • Extensions in Flutter

    Extensions in Flutter

    In Flutter extensions allowed to you to add some new functionality to existing class without modified their original…

  • Mixin in Dart:

    Mixin in Dart:

    As the name suggest Mixin is use to mix it with some thing .In Flutter Mixin is way to reuse a code into multiple class…

    1 Comment
  • StateLess Widget in Flutter

    StateLess Widget in Flutter

    Stateless widget simply means the widget that will be return in this class will not contain any state and it will never…

Insights from the community

Explore topics