Concurrency & Async

Rust concurrency patterns — threads, channels, mutex, async/await, tokio, and common patterns

Rust provides two concurrency models: OS threads for CPU-parallel work and async/await for I/O-concurrent work. The type system enforces thread safety at compile time via Send and Sync traits.

OS Threads

use std::thread;
use std::time::Duration;

// Spawn a thread
let handle = thread::spawn(|| {
    for i in 1..=5 {
        println!("thread: {}", i);
        thread::sleep(Duration::from_millis(100));
    }
});

// Wait for thread to finish
handle.join().unwrap();

// Move data into a thread
let data = vec![1, 2, 3];
let handle = thread::spawn(move || {
    println!("{:?}", data);  // data is moved into the thread
});
handle.join().unwrap();
// data is no longer available here — it was moved

Scoped threads (Rust 1.63+)

Scoped threads can borrow from the parent scope — no need for Arc:

use std::thread;

let mut data = vec![1, 2, 3, 4, 5];

thread::scope(|s| {
    // Each spawned thread borrows &data or &mut data[index]
    s.spawn(|| {
        println!("read: {:?}", &data);
    });

    s.spawn(|| {
        data[0] += 1;  // mutable borrow of separate elements
    });
});
// All threads finish before scope exits — data is safe to use here
println!("data: {:?}", data);

Channels — Message Passing

use std::sync::mpsc;  // multiple producer, single consumer
use std::thread;

// Create a channel
let (tx, rx) = mpsc::channel();

// Spawn producer thread
thread::spawn(move || {
    let messages = vec![
        String::from("hello"),
        String::from("from"),
        String::from("thread"),
    ];
    for msg in messages {
        tx.send(msg).unwrap();
    }
});

// Receive in main thread
for msg in rx {
    println!("{}", msg);
}

// Multiple producers — clone the transmitter
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
let tx2 = tx.clone();

thread::spawn(move || { tx1.send("from thread 1").unwrap(); });
thread::spawn(move || { tx2.send("from thread 2").unwrap(); });
// tx is still valid here, or drop(tx) to signal completion

Shared State — Mutex & Arc

When threads need to share mutable data, use Mutex<T> wrapped in Arc<T>:

use std::sync::{Arc, Mutex};
use std::thread;

let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
    let counter = Arc::clone(&counter);
    let handle = thread::spawn(move || {
        let mut num = counter.lock().unwrap();
        *num += 1;
    }); // lock is released here when num goes out of scope
    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap()); // 10

RwLock — read-heavy workloads

use std::sync::{Arc, RwLock};

let data = Arc::new(RwLock::new(vec![1, 2, 3]));

// Multiple readers
let r1 = data.read().unwrap();
let r2 = data.read().unwrap(); // fine — multiple readers

// Single writer
let mut w = data.write().unwrap(); // blocks until all readers release
w.push(4);

When to use what

Pattern Use when
Channels Threads communicate by sending messages
Arc<Mutex<T>> Threads share and mutate the same data
Arc<RwLock<T>> Read-heavy, write-rare shared data
Scoped threads Threads need to borrow parent data

Send and Sync

The compiler enforces thread safety automatically:

// These are Send + Sync: i32, String, Vec<T>, HashMap<K, V>
// These are NOT Send: Rc<T> (single-threaded reference counting)
// These are NOT Sync: Cell<T>, RefCell<T> (interior mutability without locks)

// Rc<T> — single-threaded reference counting (NOT Send)
use std::rc::Rc;
let rc = Rc::new(42);
// thread::spawn(move || { println!("{}", rc); }); // ERROR: Rc is not Send

// Arc<T> — atomic reference counting (IS Send + Sync)
use std::sync::Arc;
let arc = Arc::new(42);
thread::spawn(move || { println!("{}", arc); }); // OK

Async/Await

For I/O-heavy work (network calls, file I/O, databases), async/await is more efficient than threads:

use tokio::time::{sleep, Duration};

async fn fetch_data(id: u32) -> String {
    sleep(Duration::from_millis(100)).await; // simulate I/O
    format!("data for {}", id)
}

async fn process() {
    // Sequential
    let data1 = fetch_data(1).await;
    let data2 = fetch_data(2).await;

    // Concurrent — both start immediately
    let (data1, data2) = tokio::join!(fetch_data(1), fetch_data(2));

    // Concurrent — collect results from many futures
    let handles: Vec<_> = (1..=5)
        .map(|i| tokio::spawn(fetch_data(i)))
        .collect();

    for handle in handles {
        let result = handle.await.unwrap();
        println!("{}", result);
    }
}

#[tokio::main]
async fn main() {
    process().await;
}

Tokio runtime

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
// Multiple ways to run async code
#[tokio::main]
async fn main() {
    // Your async code here
}

// Or manually
fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        // Your async code here
    });
}

Async channels

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    tokio::spawn(async move {
        tx.send("hello".to_string()).await.unwrap();
        tx.send("world".to_string()).await.unwrap();
    });

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}

Select — wait on multiple async operations

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<i32>(32);
    let (tx2, mut rx2) = mpsc::channel::<i32>(32);

    tokio::spawn(async move {
        tx1.send(1).await.unwrap();
    });

    tokio::spawn(async move {
        sleep(Duration::from_millis(50)).await;
        tx2.send(2).await.unwrap();
    });

    loop {
        tokio::select! {
            Some(val) = rx1.recv() => println!("from rx1: {}", val),
            Some(val) = rx2.recv() => println!("from rx2: {}", val),
            _ = sleep(Duration::from_secs(1)) => {
                println!("timeout");
                break;
            }
        }
    }
}

Async traits

// As of Rust 1.75, async fn in traits is stable
trait Fetcher {
    async fn fetch(&self, url: &str) -> Result<String, reqwest::Error>;
}

struct HttpClient;

impl Fetcher for HttpClient {
    async fn fetch(&self, url: &str) -> Result<String, reqwest::Error> {
        reqwest::get(url).await?.text().await
    }
}

Choosing between threads and async

OS Threads Async/Await
Best for CPU-bound work, parallel computation I/O-bound work, many connections
Overhead ~8KB stack per thread ~few hundred bytes per task
Scale Hundreds to thousands Millions of concurrent tasks
Complexity Simpler model, Mutex/Arc More complex, Pin/Send bounds
Ecosystem Standard library Tokio, async-std
Mix Use tokio::task::spawn_blocking for CPU work inside async