A Kotlin flow is an asynchronous stream of data that you can subscribe to and receive objects one by one or all at once. It's like subscribing to a newspaper. You don't have to go to the newsstand every time and check if they have something new today. You just subscribed and every time they have a new newspaper, they will automatically send it to you. Flow is also part of the Coroutines framework and therefore allows you to process the received data asynchronously.
Figuratively speaking, Flow consists of three objects:
- Creator (Producer)
- Intermediary (Operator)
- Consumer (Collector)
For example, The New York Times writes newspapers - they are the creators. The newsstand where this newspaper is sold, sorted, filtered, and so on - is the Intermediary. And we, my friend, are - the Consumers.
So there are two types of flows: Hot and cold. By default they are cold. Cold, this is when the creator begins to create a newspaper, only after you have subscribed to it. Hot ones are when they start creating a newspaper immediately after they have been created.
I'll show you both examples, but for now, let's start with creating regular cold streams since they are the standard.
COLD FLOW
"Until you turn on the faucet and start collecting water, it will not start flowing."
Below is an example of a cold stream:
fun main(): Unit = runBlocking {
val newspaperFlow = produceNewspapers()
println("flow created")
println("flow collection started")
newspaperFlow.collect {
delay(100)
println("collected: $it")
}
}
fun produceNewspapers() = flow {
println("flow started")
val newspapers = listOf("The Guardian", "Medium", "Reddit")
for (item in newspapers) {
println("Producing new newspaper: $item")
emit(item)
}
}
Here is what we get as a result:
flow created
flow collection started
flow started
Producing new newspaper: The Guardian
collected: The Guardian
Producing new newspaper: Medium
collected: Medium
Producing new newspaper: Reddit
collected: Reddit
You could try it here: https://pl.kotl.in/2Hyqc0gVk
As you can see, the thread didn't start producing the paper until I subscribed to it.
HOT FLOW
"Water will flow, even if you do not collect it."
Below is an example of a hot stream:
fun main(): Unit = runBlocking {
withTimeoutOrNull(1000) {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow = mutableSharedFlow.asSharedFlow()
launch {
for (i in 1..5) {
delay(100)
println("emitted $i")
mutableSharedFlow.emit("Newspaper - $i")
}
}
launch {
delay(100)
sharedFlow.collect {
println("Shop A collected: $it")
}
}
launch {
delay(300)
sharedFlow.collect {
println("Shop B collected: $it")
}
}
}
}
Here is the result:
emitted 1
emitted 2
Shop A collected: Newspaper - 2
emitted 3
Shop A collected: Newspaper - 3
Shop B collected: Newspaper - 3
emitted 4
Shop A collected: Newspaper - 4
Shop B collected: Newspaper - 4
emitted 5
Shop A collected: Newspaper - 5
Shop B collected: Newspaper - 5
Here is the playground: https://pl.kotl.in/PgWJ-1KnV
As you can see, our newspaper continues to be published regardless of whether we are subscribed to it or not. The first newspaper was missed by both stores. The second newspaper ended up on the shelf only in the first store. But the third newspaper and all subsequent ones have already been on sale at both stores. The important thing here is that Shared flow never completes. That's why we started withTimeoutOrNull().
Below you can see different options for creating and processing flows. Here is an example of how else you can create a Flow:
flowOf(“Mars”, “Twix”, “Snickers”)
listOf(“Pepsi”,“Cola”,“Fanta”).asFlow()
(1..3).asFlow()
OPERATORS
An equally important point for flows is the operators. These are tools that help process, filter, edit, etc. flow data at the mediation stage.
For example:
suspend fun addMark(drink: String): String {
delay(500)
return "EU: $drink"
}
fun main() = runBlocking {
val drinksFlow = flowOf("Coca-Cola", "Kofola Zero", "Fanta Zero", "Sprite", "Coca-Cola Zero")
drinksFlow
.map { addMark(it) } // apply some transformation
.filter { it.contains("Zero") } // take only those that do not contain sugar
.take(2) // take only the first two results
.collect { println(it) } // print results
}
Try it here: https://pl.kotl.in/qm9OpTL3T
Let's figure out together what's going on here. We have a drink data flow. 2 standard and 3 drinks without sugar.
- .map - here we add a quality label to each product, so "Fanta" becomes "EU: Fanta" As a result of this operator, our flow will turn into a flow with the following data: [“EU: Coca-Cola", "EU: Kofola Zero", "EU: Fanta Zero", "EU: Sprite", "EU: Coca-Cola Zero”]
- .filter - then we filter the stream with new data. The result will be a flow with data like this: [“EU: Kofola Zero”, “EU: Fanta Zero”, “EU: Coca-Cola Zero”]
- take(2) - this operator will return the flow to the first 2 values in the list. The result will be the following: [“EU: Kofola Zero”, “EU: Fanta Zero”]
- Well, in the end, we call a different type of operator, which will launch this whole machine for collecting, editing and display data to us in the log. Let's run this code and check what we got.
EU: Kofola Zero
EU: Fanta Zero
Process finished with exit code 0
If you notice, I omitted the .collect operator, and that's because it is not a terminal operator, but an intermediate operator.
Yes, there are two types of operators in flows, but it's easy. Essentially, the only difference is that terminal operators take a flow and return a flow, while intermediate operators take a flow but return a specific value.
Here are examples of terminal statements:
- collect(): returns all data from the flow
- first(): returns the first value from the flow
- last(): returns the last value from the flow
- toList(): converts flow values into a List collection.
- toSet(): converts flow values into a Set collection.
- count(): returns the amount of elements in the flow.
- reduce(): accumulates a value starting from the first element and applying operations to them. If there are no values, it will return an error
- fold(): the same as reduce, but it takes an initial value and if the result is empty, it will return the initial value instead of an error.
And here are the options for intermediate operators:
- map(): transform data into something else
- filter(): you are already familiar with this, it filters the flow, leaving only those elements that match the condition
- onEach(): applies a specific function to flow elements before returning them
- combine(): combines two flow into one
- zip(): using the transform function creates one flow from two
- take(): returns a specified number of elements from the stream
- transform(): applies a transform function to the elements of the flow
- drop(n): returns a flow that ignores the first n elements
So, we got acquainted with a very important technology that developers use daily. Now you know what flows are, what kind of flows are there and how to use them. Try, practice, and enjoy coding. Thanks to everybody, I wish you a good flow! Bye! Bye!
Resources:
Article Photo by PAN XIAOZHEN, Raphael Koh, Levon Vardanyan