跳至主要內容

Rust随笔(六)

Mr.Lexon大约 9 分钟rust

Rust随笔(六)

我们在随笔五中提到过RcRefCell的概念,这两个工具给复杂的变量引用场景给予了强大的支持,这里有个问题,我们首先看看下面这一段代码:

use std::rc::Rc;
use std::thread;

fn main() {
    let data = Rc::new(5);
    let data_clone = Rc::clone(&data);
    
    thread::spawn(move || {
        println!("Data from new thread: {}", data_clone);
    });
}

当我们把这段代码尝试cargo check的时候会发现:

error[E0277]: `Rc<i32>` cannot be sent between threads safely                                                                                                                                                                      
   --> src\main.rs:24:19
    |
24  |       thread::spawn(move || {
    |       ------------- ^------
    |       |             |
    |  _____|_____________within this `{closure@src\main.rs:24:19: 24:26}`
    | |     |
    | |     required by a bound introduced by this call
25  | |         println!("Data from new thread: {}", data_clone);
26  | |     });
    | |_____^ `Rc<i32>` cannot be sent between threads safely
    |
    = help: within `{closure@src\main.rs:24:19: 24:26}`, the trait `Send` is not implemented for `Rc<i32>`
note: required because it's used within this closure
   --> src\main.rs:24:19
    |
24  |     thread::spawn(move || {
    |                   ^^^^^^^
note: required by a bound in `spawn`
   --> C:\Users\user\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:727:8
    |
724 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    |        ----- required by a bound in this function
...
727 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `basic-test` (bin "basic-test") due to 1 previous error

这里提示data_clone必须实现Send + Sync,那这两个是什么呢?又应该怎么解决呢?请等我细细道来。

Send 和 Sync

我们分别看他们的定义:

  1. Send 如果一个类型实现了Sendtrait,意味着这个类型的所有权可以安全地从一个线程转移(移动/move)到另一个线程。你可以把这个类型的值想象成一个密封完好的包裹,可以安全地把它交给另一个快递员(另一个线程),由他来打开和处理,而你交出包裹后就不能再碰它了
  2. Sync 一个类型如果实现了 Sync trait,意味着这个类型的不可变引用 &T 可以安全地被多个线程共享和访问。这里的意思相当于每个快递员都可以看这个包裹,注意,这里只有访问权,没有所有权。 因为在线程模型上,main函数所执行的一切东西都可以视为单个线程内的工作,所以如果要做并发就必然创建一个线程出来,Send保证一个类型的值在所有权被完整地移动到另一个线程后,其本身的操作和最终的销毁过程在并发环境中是安全的。而Sync是保证一个类型的值在被多个线程通过共享引用 &T 同时访问时,不会发生数据竞争。

那么我们该如何让我们的类型实现这两个东西呢?这里有个工具:Arc<T>

什么是 Arc<T>

Arc<T> 是 Rust 标准库提供的一种智能指针,它的全称是 Atomically Reference Counted,即原子引用计数指针。你可以把它直接理解为 Rc<T>多线程安全版本。它的核心目的是允许多个所有者安全地在多个线程之间共享对同一份数据的所有权

回到我们的代码,我们尝试使用Arc<T>去代替Rc<T>:

use std::sync::Arc;  
use std::thread;  
  
fn main() {  
    let data = Arc::new(5);  
    let data_clone = Arc::clone(&data);  
  
    thread::spawn(move || {  
        println!("Data from new thread: {}", data_clone);  
    });  
}

我们再运行Cargo check发现,检查通过了:

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.21

我们运行试试看:

Data from new thread: 5

结果没问题。这意味着我们可以多线程使用data_clone了,也许你会想,如果我要修改data_clone怎么办?想这样加个RefCell行不行:

use std::cell::RefCell;  
use std::sync::Arc;  
use std::thread;  
use std::time::Duration;  
  
fn main() {  
      
    let data = Arc::new(RefCell::new(5));  
    let data_clone = Arc::clone(&data);  
  
      
    let handle = thread::spawn(move || {  
        let mut num = data_clone.borrow_mut();  
        *num += 1;  
        println!("新线程中修改后的值为: {}", *num);  
    });  
      
    handle.join().unwrap();  
      
    thread::sleep(Duration::from_secs(1));  
    println!("主线程中观察到的最终值: {}", *data.borrow());  
}

结果,cargo check出现了这样的问题:

error[E0277]: `RefCell<i32>` cannot be shared between threads safely                                                                                                                                                               
   --> src\main.rs:27:32
    |
27  |       let handle = thread::spawn(move || {
    |  __________________-------------_^
    | |                  |
    | |                  required by a bound introduced by this call
28  | |         let mut num = data_clone.borrow_mut();
29  | |         *num += 1;
30  | |         println!("新线程中修改后的值为: {}", *num);
31  | |     });
    | |_____^ `RefCell<i32>` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `RefCell<i32>`
    = note: if you want to do aliasing and mutation between multiple threads, use `std::sync::RwLock` instead
    = note: required for `Arc<RefCell<i32>>` to implement `Send`
note: required because it's used within this closure
   --> src\main.rs:27:32
    |
27  |     let handle = thread::spawn(move || {
    |                                ^^^^^^^
note: required by a bound in `spawn`
   --> C:\Users\user\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:727:8
    |
724 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    |        ----- required by a bound in this function
...
727 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `basic-test` (bin "basic-test") due to 1 previous error

这里表示RefCell也不满足Send + Sync,那该怎么办呢?这里我们引入另一个工具Mutex<T>

只读的共享数据 Arc<T>

在我们深入探讨Mutex<T>之前,有一个非常重要且常见的场景:数据在程序启动时被初始化一次,之后在整个运行期间只被多个线程读取,不再修改。

比如,一个昂贵的配置、一个机器学习模型、或者一大组只读的参数。对于这种情况,我们完全不需要 MutexRwLockArc<T> 本身就足以完美地解决这个问题。

  • Arc<T> 允许多个线程共享数据所有权。
  • 因为它只提供不可变引用 &T,所以读取操作天然就是线程安全的。
  • 这是最高效的并发数据共享方式,因为它在访问时没有任何锁的开销

例子:多个线程共享一个只读的配置

use std::sync::Arc;
use std::thread;
use std::collections::HashMap;

fn main() {
    let mut config = HashMap::new();
    config.insert("addr".to_string(), "127.0.0.1:8080".to_string());
    config.insert("threads".to_string(), "8".to_string());
    
    let shared_config = Arc::new(config);

    let mut handles = vec![];

    for i in 0..5 {
        let config_clone = Arc::clone(&shared_config);
        let handle = thread::spawn(move || {
            // 线程可以安全地读取共享数据,无需加锁
            let addr = config_clone.get("addr").unwrap();
            println!("线程 {} 读取到服务器地址: {}", i, addr);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

这非常高效和简单。但问题来了,如果我们不仅要读取,还想在程序运行期间动态地修改共享数据呢,那么我们正式开始介绍Mutex<T>

什么是Mutex<T>

Mutex<T> 是 Rust 标准库中的一个同步原语,它的名字是 Mutual Exclusion (互斥锁) 的缩写。

它的核心目的是保护共享数据,确保在任何时刻只有一个线程能够访问和修改这些数据Mutex<T> 提供了在多线程环境下的“内部可变性”,因此你可以把它理解为 RefCell<T>多线程安全版本

回到我们的代码,我们尝试使用Mutex<T>去代替RefCell<T>:

use std::sync::{Arc, Mutex};   
use std::thread;  
  
fn main() {  
  
    let data = Arc::new(Mutex::new(5));  
  
    let mut handles = vec![];  
  
    // 我们创建多个线程来演示并发修改  
    for i in 0..10 {  
  
        let data_clone = Arc::clone(&data);  
  
        let handle = thread::spawn(move || {  
            let mut num = data_clone.lock().unwrap();  
  
            // 成功获取锁后,`num` 是一个 MutexGuard,我们可以像 &mut i32 一样使用它  
            *num += 1;  
            println!("线程 {} 将值增加到 {}", i, *num);  
  
        });  
  
        handles.push(handle);  
    }  
      
    for handle in handles {  
        handle.join().unwrap();  
    }  
      
    println!("\n所有线程执行完毕。");  
    println!("最终结果: {}", *data.lock().unwrap());  
}

运行结果:

线程 0 将值增加到 6
线程 1 将值增加到 7
线程 2 将值增加到 8
线程 8 将值增加到 9
线程 7 将值增加到 10
线程 5 将值增加到 11
线程 6 将值增加到 12
线程 4 将值增加到 13
线程 9 将值增加到 14
所有线程执行完毕。15
最终结果: 15

这里发现,修改成功了,我们从上面的一个的错误信息得知我们可以用另外一个工具:RwLock<T>。现在来修改一下代码。

RwLock<T>

这是例子:

use std::sync::{Arc, RwLock};   
use std::thread;  
  
fn main() {  
  
    let data = Arc::new(RwLock::new(5));  
  
    let mut handles = vec![];  
  
    for i in 0..10 {  
  
        let data_clone = Arc::clone(&data);  
  
        let handle = thread::spawn(move || {  
            let mut num = data_clone.write().unwrap();  
            *num += 1;  
            println!("线程 {} 将值增加到 {}", i, *num);  
  
        });  
  
        handles.push(handle);  
    }  
      
    for handle in handles {  
        handle.join().unwrap();  
    }  
      
    println!("\n所有线程执行完毕。");  
    println!("最终结果: {}", *data.read().unwrap());  
}

运行结果:

线程 0 将值增加到 6
线程 1 将值增加到 7
线程 9 将值增加到 8
线程 3 将值增加到 9
线程 2 将值增加到 10
线程 5 将值增加到 11
线程 6 将值增加到 12
线程 7 将值增加到 13
线程 8 将值增加到 14
所有线程执行完毕。15
最终结果: 15

在结果上,貌似没有什么区别,那问题来了,RwLock<T>Mutex<T>到底有什么区别呢?我们先看看RwLock<T>的定义: RwLock<T> 是 Rust 标准库中的另一种同步原语,它的全称是 Read-Write Lock (读写锁)。 它是一种比 Mutex<T> 更为精细的锁,因为它区分了读取操作和写入操作。它的核心设计目标是在“读多写少”的场景下提供比 Mutex 更高的并发性能。 那么 RwLock<T>Mutex<T>应该怎么选择呢?可以参考一下这个:

  • 使用 RwLock<T> 的场景
    • 当你的共享数据被读取的频率远高于被写入的频率时(“读多写少”)。
    • 优点:在这种场景下,多个读取者可以并行执行,大大提高了程序的并发度和吞吐量。
    • 例子:一个很少变动的全局配置、一个需要被多个线程查询的缓存系统。
  • 使用 Mutex<T> 的场景
    • 当你的数据被读取和写入的频率差不多,或者写入非常频繁时。在这种情况下,RwLock 管理读写两种状态的额外开销可能会使其比结构更简单的 Mutex 还要慢。
    • 当你只想用最简单的锁来保证互斥,而不需要区分读写时。

潜在缺点RwLock 的一个经典理论问题是写者饥饿 (Writer Starvation)。如果读锁的请求一直不断地到来,那么等待获取写锁的线程可能会永远等待下去。不过 Rust 标准库中的 RwLock 实现包含了一些机制来缓解这个问题。

总结

当我们需要在多线程环境下共享数据时:

  1. 如果数据在初始化后是只读的,那么使用 Arc<T> 就是最简单、最高效的选择。
  2. 如果数据需要被读取和写入,那么我们可以采用 Arc<Mutex<T>> 的组合来解决这个问题。
  3. 进一步地,如果数据是“读多写少”的,可以考虑使用 Arc<RwLock<T>> 来获得更好的并发性能。
上次编辑于:
贡献者: Lexon