Chapter 16: Fearless Concurrency

Using Threads to Run Code Simultaneously

Waiting for all threads to finish

Spawning a thread returns a JoinHandle, which is an owned value that exposes the join method. When called, join blocks the calling (current) thread until the handled thread completes.

Using move Closures with threads

A move closure allows data to be used from one thread in another thread. It moves ownership of the data used to the thread's environment.

Using Message Passing to Transfer Data Between Threads

Rust's major abstraction for accomplishing message-sending concurrency is the channel.

A channel has two halves: a transmitter and a receiver. Rust's implementation allows for multiple producers and a single receiver/consumer, hence mpsc.

A channel is said to be closed if either half (sender or receiver) of a channel is dropped.

In the below code, we'll spawn a new thread that says hello to the main thread:


#![allow(unused)]
fn main() {
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
  let value = String::from("hello");
  tx.send(value).unwrap();
});

let received = rx.recv().unwrap();
println!("Transmitter said {}", received);
}

Some things to note from the example:

  • tx.send() returns a Result because it's possible that the receiving end has already been dropped.
  • tx.send(value) steals ownership! value will no longer be usable.
  • Once a sender thread has finished, calling recv() in the main thread will return an Error result, which indicates that no more values will be coming down from the receiver.

Tip: Instead of recv, you can also use try_recv, which does not block the receiving thread.

Sending Multiple Values (Proof of Concurrency!)

Tweaking the example above, the sender will send multiple values:


#![allow(unused)]
fn main() {
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
  let vals = vec![
    String::from("hello"),
    String::from("from"),
    String::from("the"),
    String::from("thread"),
  ];

  for val in vals {
    tx.send(val).unwrap();
    thread::sleep(Duration::from_millis(100));
  }
});

// A receiver is an Iterator!
for recvd in rx {
  println!("Sender said {}", recvd);
}
}

Sending Multiple Values from Multiple Transmitters

Senders can be cloned such that we can listen to messages from multiple threads:


#![allow(unused)]
fn main() {
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
let (tx1, rx) = mpsc::channel();
let tx2 = mpsc::Sender::clone(&tx1);

thread::spawn(move || {
  let vals = vec![
    String::from("Thread 1: hello"),
    String::from("Thread 1: from"),
    String::from("Thread 1: thread"),
    String::from("Thread 1: UNO"),
  ];

  for val in vals {
    tx1.send(val).unwrap();
    thread::sleep(Duration::from_millis(100));
  }
});

thread::spawn(move || {
  let vals = vec![
    String::from("Thread 2: hello"),
    String::from("Thread 2: from"),
    String::from("Thread 2: thread"),
    String::from("Thread 2: DOS"),
  ];

  for val in vals {
    tx2.send(val).unwrap();
    thread::sleep(Duration::from_millis(50));
  }
});

// A receiver is an Iterator!
for recvd in rx {
  println!("Sender said {}", recvd);
}
}

Shared-State Concurrency

Above we saw concurrency via message communication. Now we'll look at concurrency via shared memory.

Using Mutexes to Allow Access to Data from One Thread at a Time

Concept: A mutex is a mechanism that allows only one thread to access data at any given time. To access data, a thread has to signal that it wants the mutex's lock. When it's done, it have to give the lock back to allow other threads to access the data.

Warning: A mutex cannot protect you from deadlocks! A deadlock occurs when an operation needs to lock two resources and two threads have each acquired one of the locks, causing them to wait for each other forever.

In the below example, we'll use a super simple mutex and comment the different aspects of its use:


#![allow(unused)]
fn main() {
use std::sync::Mutex;
let m = Mutex::new(5);

// We'll wrap this in an inner scope so that the lock will be dropped,
// allowing others to use it
{
  let mut val = m
    // Get the lock. NOTE: This method blocks!
    .lock()
    // In rust, the Result returned by lock contains the actual data,
    // wrapped in a MutexGuard
    .unwrap();

  // The data itself is a smart pointer!
  *val += 1;
}

println!("m = {:?}", m);
}

Concept: A call to lock() will fail if another thread hold the lock has panicked. Once this happens, the mutex will never be free. When a mutex is in such a state, it's said that the mutex is poisoned.

Sharing a Mutex Between Multiple Threads

In this example, we share a number behind a mutex among 10 threads. Each thread will increment the number.

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
  let counter = Arc::new(Mutex::new(0));
  let mut handles = vec![];

  for i in 0..10 {
    let counter = Arc::clone(&counter);
    handles.push(thread::spawn(move || {
      println!("Handle {} is running!", i);
      let mut val = counter.lock().unwrap();
      *val += 1;
    }));
  }

  handles.into_iter().for_each(|h| h.join().unwrap());

  println!("Result: {}", counter.lock().unwrap());
}