Tokio Async APIS - Random notes
In this article, we will explore how to effectively use Tokio and the Futures crate for asynchronous programming in Rust. We’ll cover setting up your project, creating runtimes, spawning tasks, working with channels, handling mutexes, and more. Let’s dive in!
1. Setting Up Tokio and Futures in Your Rust Project
To get started, add Tokio and Futures to your project’s Cargo.toml file:
// Run the following commands in your terminal:
cargo add tokio -F full // Adds Tokio with all features enabled
cargo add futures // Adds the Futures crate
By enabling the full feature set, you'll have access to all of Tokio's modules, such as networking, timers, and synchronization primitives.
2. Creating a Tokio Runtime
A Tokio runtime allows you to run asynchronous code. You can create the runtime manually:
use tokio::runtime;
async fn async_fun() {
println!("Hello from async");
}
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async_fun());
}
Alternatively, you can convert the main function to async using macros:
async fn async_fun() {
println!("Hello from async");
}
#[tokio::main] // Default multi-threaded runtime
async fn main() {
// Async code
async_fun().await;
}
#[tokio::main(flavor = "current_thread")] // Single-threaded runtime
async fn main() {
// Async code
async_fun().await;
}
3. Avoid Blocking in Async Code
Important Note: Never use std::thread::sleep() within async tasks. This function blocks the entire thread, causing all tasks to run serially. Instead, use tokio::time::sleep():
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
This allows the Tokio runtime to continue running other tasks while the current one sleeps, preserving concurrency.
4. Spawning Asynchronous Tasks
To run an async operation concurrently, use tokio::spawn:
async fn asyn_fun() {
for i in 0..10 {
println!("Jai Shree Ram {i}");
}
}
#[tokio::main]
async main() {
tokio::spawn(asyn_fun());
}
You can also include the async block in spawn as below:
tokio::spawn(async move {
// Async work here
});
Note: tokio::spawn returns a JoinHandle that must be awaited to retrieve the result or handle completion:
5. Blocking Operations in Tokio
For blocking code, use tokio::task::spawn_blocking to run it on a separate thread without affecting the async runtime:
use tokio::task::spawn_blocking;
use std::time::Duration;
let result = spawn_blocking(move || {
// Blocking operation
std::thread::sleep(Duration::from_millis(500));
}).await;
Detached Blocking Tasks:
If you need to run a task independently, avoid using .await:
let result = spawn_blocking(move || {
// Detached blocking operation
std::thread::sleep(Duration::from_millis(500));
});
6. Yielding Control in Async Tasks
To yield control and allow other tasks to run, use:
tokio::task::yield_now().await;
7. Channels in Tokio
Using tokio::sync::mpsc Channels:
Create a multi-producer, single-consumer (MPSC) channel:
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
// Note: Both `send()` and `recv()` need to be awaited.
The receiver must always be declared as mutable (mut) because it is consumed as the stream is processed.
Using Broadcast Channels:
Tokio provides broadcast channels, which allow multiple receivers:
let (tx, mut rx) = tokio::sync::broadcast::channel::<String>(100);
Note: Broadcast channels are not available in std::mpsc.
8. Handling Blocking and Async Operations Together
You can obtain a handle to the current Tokio runtime:
let handle = tokio::runtime::Handle::current();
This handle can be used to spawn async tasks from a synchronous context:
handle.spawn(async move {
// Perform async operations
});
Recommended by LinkedIn
9. Canceling Async Tasks
You can cancel tasks using the abort() method on the JoinHandle:
async fn run() {
let handle = tokio::spawn(async {
// Some async task
});
// Cancel the task
handle.abort();
}
10. Using tokio::sync::Mutex
The standard library’s Mutex is not suitable for async tasks because it can block an entire thread. Instead, use Tokio’s async mutex:
use tokio::sync::Mutex;
use once_cell::sync::Lazy;
static COUNTER: Lazy<Mutex<u32>> = Lazy::new(|| Mutex::new(0));
async fn increment() {
let mut counter = COUNTER.lock().await;
*counter += 1;
}
11. Selecting Between Futures Using tokio::select!
You can use tokio::select! to run multiple futures concurrently, and act when the first one completes:
tokio::select! {
_ = function1() => println!("function1() completed first"),
_ = function2() => println!("function2() completed first"),
}
12. Complete Example: Async Channels in Action
Here is a complete example demonstrating the use of Tokio async channels:
#[tokio::main]
async fn main() {
let (tx, rx) = std::sync::mpsc::channel::<Command>();
let (tx_reply, mut rx_reply) = tokio::sync::mpsc::channel::<String>(10);
let handle = tokio::runtime::Handle::current();
std::thread::spawn(move || {
while let Ok(command) = rx.recv() {
match command {
Command::Print(s) => {
let tx_reply = tx_reply.clone();
handle.spawn(async move {
tx_reply.send(s).await.unwrap();
});
},
}
}
});
tokio::spawn(async move {
while let Some(reply) = rx_reply.recv().await {
println!("{reply}");
}
});
let mut counter = 0;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
tx.send(Command::Print(format!("Hello {counter}"))).unwrap();
counter += 1;
}
}
Note: Both send() and recv() operations must be awaited, and the receiver must be mutable.
13. Timeout Handling with tokio::time::timeout
Sometimes you need to ensure that an async operation completes within a specific duration. Tokio provides a way to enforce timeouts using tokio::time::timeout:
use tokio::time::{timeout, Duration};
async fn run_with_timeout() {
let result = timeout(Duration::from_secs(5), async_task()).await;
match result {
Ok(_) => println!("Task completed within the timeout."),
Err(_) => println!("Task timed out!"),
}
}
Note: If the async task does not complete within the specified time, it returns an Err.
14. Interval-Based Operations with tokio::time::interval
For tasks that need to run periodically, use tokio::time::interval. This creates a recurring timer:
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
let mut interval = interval(Duration::from_secs(2));
loop {
interval.tick().await;
println!("This message prints every 2 seconds.");
}
}
15. One-Shot Channels with tokio::sync::oneshot
oneshot channels are used for sending a single message between tasks:
use tokio::sync::oneshot;
async fn example() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let _ = tx.send("Hello, world!");
});
let message = rx.await.expect("Failed to receive message");
println!("{message}");
}
Use Case: Useful when you want to notify one task from another or send a single response.
16. Reading and Writing Files Asynchronously
Tokio provides tokio::fs for async file operations. You can read from or write to files without blocking the main async runtime:
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
async fn read_write_example() -> io::Result<()> {
let mut file = File::open("foo.txt").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let mut file = File::create("bar.txt").await?;
file.write_all(&contents).await?;
Ok(())
}
17. Using tokio::sync::RwLock for Async Read-Write Locking
For scenarios where you need concurrent read access but exclusive write access, tokio::sync::RwLock can be used:
use tokio::sync::RwLock;
use std::sync::Arc;
async fn use_rwlock() {
let data = Arc::new(RwLock::new(5));
let read_data = data.read().await;
println!("Read: {}", *read_data);
// Drop the read lock before acquiring the write lock
drop(read_data);
let mut write_data = data.write().await;
*write_data += 1;
}
18. Using tokio::join! for Concurrent Task Execution
If you need to run multiple async tasks concurrently and wait for all of them to complete, use tokio::join!:
async fn task1() {
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Task 1 completed");
}
async fn task2() {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Task 2 completed");
}
#[tokio::main]
async fn main() {
tokio::join!(task1(), task2());
println!("Both tasks have completed.");
}
19. Combining Futures with futures::join! and futures::try_join!
While tokio::join! waits for all futures, you can also use futures::try_join!, which stops at the first future that returns an error:
use futures::try_join;
async fn task_with_result() -> Result<(), &'static str> {
Ok(())
}
async fn run_tasks() {
match try_join!(task_with_result(), task_with_result()) {
Ok(_) => println!("Both tasks completed successfully."),
Err(e) => println!("One of the tasks failed: {e}"),
}
}
20. Using tokio::signal for Graceful Shutdowns
For applications that need to handle shutdown signals (like Ctrl+C), use tokio::signal:
use tokio::signal;
#[tokio::main]
async fn main() {
tokio::spawn(async {
println!("Server is running...");
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
println!("Shutdown signal received.");
});
// Main application logic
loop {
// Busy while loop
}
}
Use Case: Helps in creating graceful shutdowns for servers or long-running tasks.