Chapter 16: Fearless Concurrency
Using Threads to Run Code Simultaneously
Waiting for all threads to finish
Spawning a thread returns a JoinHandle
, which is an owned value that exposes the join
method. When called, join
blocks the calling (current) thread until the handled thread completes.
Using move
Closures with threads
A move
closure allows data to be used from one thread in another thread. It moves ownership of the data used to the thread's environment.
Using Message Passing to Transfer Data Between Threads
Rust's major abstraction for accomplishing message-sending concurrency is the channel.
A channel has two halves: a transmitter and a receiver. Rust's implementation allows for multiple producers and a single receiver/consumer, hence mpsc.
A channel is said to be closed if either half (sender or receiver) of a channel is dropped.
In the below code, we'll spawn a new thread that says hello to the main thread:
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; use std::time::Duration; let (tx, rx) = mpsc::channel(); thread::spawn(move || { let value = String::from("hello"); tx.send(value).unwrap(); }); let received = rx.recv().unwrap(); println!("Transmitter said {}", received); }
Some things to note from the example:
tx.send()
returns aResult
because it's possible that the receiving end has already been dropped.tx.send(value)
steals ownership!value
will no longer be usable.- Once a sender thread has finished, calling
recv()
in the main thread will return anError
result, which indicates that no more values will be coming down from the receiver.
Tip: Instead of
recv
, you can also usetry_recv
, which does not block the receiving thread.
Sending Multiple Values (Proof of Concurrency!)
Tweaking the example above, the sender will send multiple values:
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; use std::time::Duration; let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hello"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_millis(100)); } }); // A receiver is an Iterator! for recvd in rx { println!("Sender said {}", recvd); } }
Sending Multiple Values from Multiple Transmitters
Senders can be cloned such that we can listen to messages from multiple threads:
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc; use std::time::Duration; let (tx1, rx) = mpsc::channel(); let tx2 = mpsc::Sender::clone(&tx1); thread::spawn(move || { let vals = vec![ String::from("Thread 1: hello"), String::from("Thread 1: from"), String::from("Thread 1: thread"), String::from("Thread 1: UNO"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_millis(100)); } }); thread::spawn(move || { let vals = vec![ String::from("Thread 2: hello"), String::from("Thread 2: from"), String::from("Thread 2: thread"), String::from("Thread 2: DOS"), ]; for val in vals { tx2.send(val).unwrap(); thread::sleep(Duration::from_millis(50)); } }); // A receiver is an Iterator! for recvd in rx { println!("Sender said {}", recvd); } }
Shared-State Concurrency
Above we saw concurrency via message communication. Now we'll look at concurrency via shared memory.
Using Mutexes to Allow Access to Data from One Thread at a Time
Concept: A mutex is a mechanism that allows only one thread to access data at any given time. To access data, a thread has to signal that it wants the mutex's lock. When it's done, it have to give the lock back to allow other threads to access the data.
Warning: A mutex cannot protect you from deadlocks! A deadlock occurs when an operation needs to lock two resources and two threads have each acquired one of the locks, causing them to wait for each other forever.
In the below example, we'll use a super simple mutex and comment the different aspects of its use:
#![allow(unused)] fn main() { use std::sync::Mutex; let m = Mutex::new(5); // We'll wrap this in an inner scope so that the lock will be dropped, // allowing others to use it { let mut val = m // Get the lock. NOTE: This method blocks! .lock() // In rust, the Result returned by lock contains the actual data, // wrapped in a MutexGuard .unwrap(); // The data itself is a smart pointer! *val += 1; } println!("m = {:?}", m); }
Concept: A call to
lock()
will fail if another thread hold the lock has panicked. Once this happens, the mutex will never be free. When a mutex is in such a state, it's said that the mutex is poisoned.
Sharing a Mutex Between Multiple Threads
In this example, we share a number behind a mutex among 10 threads. Each thread will increment the number.
use std::sync::{Mutex, Arc}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for i in 0..10 { let counter = Arc::clone(&counter); handles.push(thread::spawn(move || { println!("Handle {} is running!", i); let mut val = counter.lock().unwrap(); *val += 1; })); } handles.into_iter().for_each(|h| h.join().unwrap()); println!("Result: {}", counter.lock().unwrap()); }