Tokio Async APIS - Random notes

In this article, we will explore how to effectively use Tokio and the Futures crate for asynchronous programming in Rust. We’ll cover setting up your project, creating runtimes, spawning tasks, working with channels, handling mutexes, and more. Let’s dive in!


1. Setting Up Tokio and Futures in Your Rust Project

To get started, add Tokio and Futures to your project’s Cargo.toml file:

// Run the following commands in your terminal:
cargo add tokio -F full     // Adds Tokio with all features enabled
cargo add futures           // Adds the Futures crate        

By enabling the full feature set, you'll have access to all of Tokio's modules, such as networking, timers, and synchronization primitives.


2. Creating a Tokio Runtime

A Tokio runtime allows you to run asynchronous code. You can create the runtime manually:

use tokio::runtime;

async fn async_fun() {
    println!("Hello from async");
}

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async_fun());
}        

Alternatively, you can convert the main function to async using macros:

async fn async_fun() {
    println!("Hello from async");
}

#[tokio::main] // Default multi-threaded runtime
async fn main() {
    // Async code
    async_fun().await;
}

#[tokio::main(flavor = "current_thread")] // Single-threaded runtime
async fn main() {
    // Async code
    async_fun().await;
}        


3. Avoid Blocking in Async Code

Important Note: Never use std::thread::sleep() within async tasks. This function blocks the entire thread, causing all tasks to run serially. Instead, use tokio::time::sleep():

tokio::time::sleep(std::time::Duration::from_secs(1)).await;        

This allows the Tokio runtime to continue running other tasks while the current one sleeps, preserving concurrency.


4. Spawning Asynchronous Tasks

To run an async operation concurrently, use tokio::spawn:

async fn asyn_fun() {
    for i in 0..10 {
        println!("Jai Shree Ram {i}");
    }
}
#[tokio::main] 
async main() {
    tokio::spawn(asyn_fun());
}        

You can also include the async block in spawn as below:

tokio::spawn(async move {
    // Async work here
});        

Note: tokio::spawn returns a JoinHandle that must be awaited to retrieve the result or handle completion:


5. Blocking Operations in Tokio

For blocking code, use tokio::task::spawn_blocking to run it on a separate thread without affecting the async runtime:

use tokio::task::spawn_blocking;
use std::time::Duration;

let result = spawn_blocking(move || {
    // Blocking operation
    std::thread::sleep(Duration::from_millis(500));
}).await;        

Detached Blocking Tasks:

If you need to run a task independently, avoid using .await:

let result = spawn_blocking(move || {
    // Detached blocking operation
    std::thread::sleep(Duration::from_millis(500));
});        

6. Yielding Control in Async Tasks

To yield control and allow other tasks to run, use:

tokio::task::yield_now().await;        

7. Channels in Tokio

Using tokio::sync::mpsc Channels:

Create a multi-producer, single-consumer (MPSC) channel:

let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);

// Note: Both `send()` and `recv()` need to be awaited.        

The receiver must always be declared as mutable (mut) because it is consumed as the stream is processed.

Using Broadcast Channels:

Tokio provides broadcast channels, which allow multiple receivers:

let (tx, mut rx) = tokio::sync::broadcast::channel::<String>(100);
        

Note: Broadcast channels are not available in std::mpsc.

8. Handling Blocking and Async Operations Together

You can obtain a handle to the current Tokio runtime:

let handle = tokio::runtime::Handle::current();        

This handle can be used to spawn async tasks from a synchronous context:

handle.spawn(async move {
    // Perform async operations
});        

9. Canceling Async Tasks

You can cancel tasks using the abort() method on the JoinHandle:

async fn run() {
    let handle = tokio::spawn(async {
        // Some async task
    });
    
    // Cancel the task
    handle.abort();
}        

10. Using tokio::sync::Mutex

The standard library’s Mutex is not suitable for async tasks because it can block an entire thread. Instead, use Tokio’s async mutex:

use tokio::sync::Mutex;
use once_cell::sync::Lazy;

static COUNTER: Lazy<Mutex<u32>> = Lazy::new(|| Mutex::new(0));

async fn increment() {
    let mut counter = COUNTER.lock().await;
    *counter += 1;
}        

11. Selecting Between Futures Using tokio::select!

You can use tokio::select! to run multiple futures concurrently, and act when the first one completes:

tokio::select! {
    _ = function1() => println!("function1() completed first"),
    _ = function2() => println!("function2() completed first"),
}        

12. Complete Example: Async Channels in Action

Here is a complete example demonstrating the use of Tokio async channels:

#[tokio::main]
async fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<Command>();
    let (tx_reply, mut rx_reply) = tokio::sync::mpsc::channel::<String>(10);
    
    let handle = tokio::runtime::Handle::current();
    std::thread::spawn(move || {
        while let Ok(command) = rx.recv() {
            match command {
                Command::Print(s) => {
                    let tx_reply = tx_reply.clone();
                    handle.spawn(async move {
                        tx_reply.send(s).await.unwrap();
                    });
                },
            }
        }
    });

    tokio::spawn(async move {
        while let Some(reply) = rx_reply.recv().await {
            println!("{reply}");
        }
    });

    let mut counter = 0;
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        tx.send(Command::Print(format!("Hello {counter}"))).unwrap();
        counter += 1;
    }
}        

Note: Both send() and recv() operations must be awaited, and the receiver must be mutable.



13. Timeout Handling with tokio::time::timeout

Sometimes you need to ensure that an async operation completes within a specific duration. Tokio provides a way to enforce timeouts using tokio::time::timeout:

use tokio::time::{timeout, Duration};

async fn run_with_timeout() {
    let result = timeout(Duration::from_secs(5), async_task()).await;
    match result {
        Ok(_) => println!("Task completed within the timeout."),
        Err(_) => println!("Task timed out!"),
    }
}        

Note: If the async task does not complete within the specified time, it returns an Err.

14. Interval-Based Operations with tokio::time::interval

For tasks that need to run periodically, use tokio::time::interval. This creates a recurring timer:

use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    let mut interval = interval(Duration::from_secs(2));
    loop {
        interval.tick().await;
        println!("This message prints every 2 seconds.");
    }
}        

15. One-Shot Channels with tokio::sync::oneshot

oneshot channels are used for sending a single message between tasks:

use tokio::sync::oneshot;

async fn example() {
    let (tx, rx) = oneshot::channel();
    
    tokio::spawn(async move {
        let _ = tx.send("Hello, world!");
    });
    
    let message = rx.await.expect("Failed to receive message");
    println!("{message}");
}        


Use Case: Useful when you want to notify one task from another or send a single response.

16. Reading and Writing Files Asynchronously

Tokio provides tokio::fs for async file operations. You can read from or write to files without blocking the main async runtime:

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

async fn read_write_example() -> io::Result<()> {
    let mut file = File::open("foo.txt").await?;
    let mut contents = vec![];
    file.read_to_end(&mut contents).await?;
    
    let mut file = File::create("bar.txt").await?;
    file.write_all(&contents).await?;
    
    Ok(())
}        


17. Using tokio::sync::RwLock for Async Read-Write Locking

For scenarios where you need concurrent read access but exclusive write access, tokio::sync::RwLock can be used:

use tokio::sync::RwLock;
use std::sync::Arc;

async fn use_rwlock() {
    let data = Arc::new(RwLock::new(5));
    
    let read_data = data.read().await;
    println!("Read: {}", *read_data);
    
    // Drop the read lock before acquiring the write lock
    drop(read_data);
    
    let mut write_data = data.write().await;
    *write_data += 1;
}        


18. Using tokio::join! for Concurrent Task Execution

If you need to run multiple async tasks concurrently and wait for all of them to complete, use tokio::join!:

async fn task1() {
    tokio::time::sleep(Duration::from_secs(2)).await;
    println!("Task 1 completed");
}

async fn task2() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("Task 2 completed");
}

#[tokio::main]
async fn main() {
    tokio::join!(task1(), task2());
    println!("Both tasks have completed.");
}        

19. Combining Futures with futures::join! and futures::try_join!

While tokio::join! waits for all futures, you can also use futures::try_join!, which stops at the first future that returns an error:

use futures::try_join;

async fn task_with_result() -> Result<(), &'static str> {
    Ok(())
}

async fn run_tasks() {
    match try_join!(task_with_result(), task_with_result()) {
        Ok(_) => println!("Both tasks completed successfully."),
        Err(e) => println!("One of the tasks failed: {e}"),
    }
}        


20. Using tokio::signal for Graceful Shutdowns

For applications that need to handle shutdown signals (like Ctrl+C), use tokio::signal:

use tokio::signal;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        println!("Server is running...");
        signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
        println!("Shutdown signal received.");
    });

    // Main application logic
    loop {
        // Busy while loop 
    }
}        

Use Case: Helps in creating graceful shutdowns for servers or long-running tasks.


Referance : Asynchronous Rust - KLA Training (Dec 2023) (bracketproductions.com)

To view or add a comment, sign in

More articles by Amit Nadiger

  • Reducing the size of Rust Executables

    Reducing the size of Rust Executables

    First of all why are Rust Executables Large? Debug Symbols: By default, Rust includes debug symbols in the binary for…

  • Rust Stream

    Rust Stream

    In Rust, streams are a core part of asynchronous programming, commonly used to handle sequences of values produced…

  • Atomics in Rust

    Atomics in Rust

    Atomics in Rust are fundamental building blocks for achieving safe concurrent programming. They enable multiple threads…

  • Frequently used Thread API - Random notes

    Frequently used Thread API - Random notes

    Thread Creation and Management: thread::spawn: Creates a new thread and executes a closure within it. It returns a…

  • Difference b/w Cell and RefCell

    Difference b/w Cell and RefCell

    Both Cell and RefCell are used in Rust to introduce interior mutability within immutable data structures, which means…

  • Tokio::spawn() in depth

    Tokio::spawn() in depth

    Tokio::spawn() is a function provided by the Tokio runtime that allows you to create a new concurrent task. Unlike…

  • tokio::spawn() Vs Async block Vs Async func

    tokio::spawn() Vs Async block Vs Async func

    Asynchronous programming is a powerful paradigm for handling I/O-bound operations efficiently. Rust provides several…

  • Reactor and Executors in Async programming

    Reactor and Executors in Async programming

    In asynchronous (async) programming, Reactor and Executor are two crucial components responsible for managing the…

  • Safe Integer Arithmetic in Rust

    Safe Integer Arithmetic in Rust

    Rust, as a systems programming language, emphasizes safety and performance. One critical aspect of system programming…

  • iter() vs into_iter()

    iter() vs into_iter()

    In Rust, iter() and into_iter() are methods used to create iterators over collections, but they have distinct…

Insights from the community

Others also viewed

Explore topics