Concurrency & Async
Rust concurrency patterns — threads, channels, mutex, async/await, tokio, and common patterns
Rust provides two concurrency models: OS threads for CPU-parallel work and async/await for I/O-concurrent work. The type system enforces thread safety at compile time via Send and Sync traits.
OS Threads
use std::thread;
use std::time::Duration;
// Spawn a thread
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("thread: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// Wait for thread to finish
handle.join().unwrap();
// Move data into a thread
let data = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("{:?}", data); // data is moved into the thread
});
handle.join().unwrap();
// data is no longer available here — it was moved
Scoped threads (Rust 1.63+)
Scoped threads can borrow from the parent scope — no need for Arc:
use std::thread;
let mut data = vec![1, 2, 3, 4, 5];
thread::scope(|s| {
// Each spawned thread borrows &data or &mut data[index]
s.spawn(|| {
println!("read: {:?}", &data);
});
s.spawn(|| {
data[0] += 1; // mutable borrow of separate elements
});
});
// All threads finish before scope exits — data is safe to use here
println!("data: {:?}", data);Channels — Message Passing
use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
// Create a channel
let (tx, rx) = mpsc::channel();
// Spawn producer thread
thread::spawn(move || {
let messages = vec![
String::from("hello"),
String::from("from"),
String::from("thread"),
];
for msg in messages {
tx.send(msg).unwrap();
}
});
// Receive in main thread
for msg in rx {
println!("{}", msg);
}
// Multiple producers — clone the transmitter
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
let tx2 = tx.clone();
thread::spawn(move || { tx1.send("from thread 1").unwrap(); });
thread::spawn(move || { tx2.send("from thread 2").unwrap(); });
// tx is still valid here, or drop(tx) to signal completion
Shared State — Mutex & Arc
When threads need to share mutable data, use Mutex<T> wrapped in Arc<T>:
use std::sync::{Arc, Mutex};
use std::thread;
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
}); // lock is released here when num goes out of scope
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 10
RwLock — read-heavy workloads
use std::sync::{Arc, RwLock};
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
// Multiple readers
let r1 = data.read().unwrap();
let r2 = data.read().unwrap(); // fine — multiple readers
// Single writer
let mut w = data.write().unwrap(); // blocks until all readers release
w.push(4);When to use what
| Pattern | Use when |
|---|---|
| Channels | Threads communicate by sending messages |
Arc<Mutex<T>> |
Threads share and mutate the same data |
Arc<RwLock<T>> |
Read-heavy, write-rare shared data |
| Scoped threads | Threads need to borrow parent data |
Send and Sync
The compiler enforces thread safety automatically:
Send: A type can be transferred to another thread (most types areSend)Sync: A type can be shared between threads via&T(safe to reference from multiple threads)
// These are Send + Sync: i32, String, Vec<T>, HashMap<K, V>
// These are NOT Send: Rc<T> (single-threaded reference counting)
// These are NOT Sync: Cell<T>, RefCell<T> (interior mutability without locks)
// Rc<T> — single-threaded reference counting (NOT Send)
use std::rc::Rc;
let rc = Rc::new(42);
// thread::spawn(move || { println!("{}", rc); }); // ERROR: Rc is not Send
// Arc<T> — atomic reference counting (IS Send + Sync)
use std::sync::Arc;
let arc = Arc::new(42);
thread::spawn(move || { println!("{}", arc); }); // OK
Async/Await
For I/O-heavy work (network calls, file I/O, databases), async/await is more efficient than threads:
use tokio::time::{sleep, Duration};
async fn fetch_data(id: u32) -> String {
sleep(Duration::from_millis(100)).await; // simulate I/O
format!("data for {}", id)
}
async fn process() {
// Sequential
let data1 = fetch_data(1).await;
let data2 = fetch_data(2).await;
// Concurrent — both start immediately
let (data1, data2) = tokio::join!(fetch_data(1), fetch_data(2));
// Concurrent — collect results from many futures
let handles: Vec<_> = (1..=5)
.map(|i| tokio::spawn(fetch_data(i)))
.collect();
for handle in handles {
let result = handle.await.unwrap();
println!("{}", result);
}
}
#[tokio::main]
async fn main() {
process().await;
}Tokio runtime
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }// Multiple ways to run async code
#[tokio::main]
async fn main() {
// Your async code here
}
// Or manually
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// Your async code here
});
}Async channels
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(32);
tokio::spawn(async move {
tx.send("hello".to_string()).await.unwrap();
tx.send("world".to_string()).await.unwrap();
});
while let Some(msg) = rx.recv().await {
println!("{}", msg);
}
}Select — wait on multiple async operations
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel::<i32>(32);
let (tx2, mut rx2) = mpsc::channel::<i32>(32);
tokio::spawn(async move {
tx1.send(1).await.unwrap();
});
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
tx2.send(2).await.unwrap();
});
loop {
tokio::select! {
Some(val) = rx1.recv() => println!("from rx1: {}", val),
Some(val) = rx2.recv() => println!("from rx2: {}", val),
_ = sleep(Duration::from_secs(1)) => {
println!("timeout");
break;
}
}
}
}Async traits
// As of Rust 1.75, async fn in traits is stable
trait Fetcher {
async fn fetch(&self, url: &str) -> Result<String, reqwest::Error>;
}
struct HttpClient;
impl Fetcher for HttpClient {
async fn fetch(&self, url: &str) -> Result<String, reqwest::Error> {
reqwest::get(url).await?.text().await
}
}Choosing between threads and async
| OS Threads | Async/Await | |
|---|---|---|
| Best for | CPU-bound work, parallel computation | I/O-bound work, many connections |
| Overhead | ~8KB stack per thread | ~few hundred bytes per task |
| Scale | Hundreds to thousands | Millions of concurrent tasks |
| Complexity | Simpler model, Mutex/Arc | More complex, Pin/Send bounds |
| Ecosystem | Standard library | Tokio, async-std |
| Mix | Use tokio::task::spawn_blocking for CPU work inside async |