跳至主要內容

Rust随笔(八)

Mr.Lexon大约 13 分钟rust

Rust随笔(八)

今天来谈谈Rust里面的Sync和Send,在随笔七里面我们提到这两个概念,今天我们来详细的说一下。这两个trait是rust异步编程的主要基石之一。我们首先看看定义:

  1. Send Send 表示一个类型的所有权可以安全地在线程之间转移。
  2. Sync Sync 表示一个类型的不可变引用&T)可以安全地在线程之间共享。 只有同时实现这两者,才能在多线程环境下共享数据,看着定义我们会感到有点疑惑,接下来我们用例子说明。

Send

为什么需要 Send trait? 在多线程环境下我们面临一个问题:当我们把数据从一个线程移动到另一个线程时,可能会发生什么? 首先,数据移动会引用计数的并发修改,如果使用非原子操作,多线程会导致计数错误;其次,多线程访问数据会导致内部状态的共享,导致多个线程访问同一内存。最后,某些类型依赖于特定线程的上下文,转移到其他线程会破坏这种依赖,导致内存被错误地释放或泄漏。 综上,我们需要Send去保证这些问题不会发生,以下我们用例子说明:

use std::thread;
use std::sync::{Arc, Mutex};
use std::rc::Rc;
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

fn main() {
    
    println!("基本类型自动实现 Send");
    
    let number = 42;
    let message = String::from("Hello from thread!");
    let data = vec![1, 2, 3, 4, 5];
    
    println!("主线程创建数据:number={}, message=\"{}\", vec={:?}", 
             number, message, data);
    
    // 这些类型都实现了 Send,可以安全转移所有权
    let handle = thread::spawn(move || {
        println!("子线程接收数据:");
        println!("number = {}", number);
        println!("message = {}", message);
        println!("data = {:?}", data);
        println!("所有权成功转移,原线程不再能访问这些数据");
    });
    
    handle.join().unwrap();
    // println!("{}", message); // 错误,所有权已经转移
    
    println!("Rc 不实现 Send(非线程安全)");
    println!("Rc 内部使用非原子计数");
    
    println!("如果两个线程同时克隆 Rc");
    println!("线程 A: 读取 count=2  →  线程 B: 读取 count=2");
    println!("线程 A: 设置 count=3  →  线程 B: 设置 count=3");
    println!("结果:计数应该是 4,实际是 3!内存泄漏或重复释放!");
    
    let rc_data = Rc::new(vec![1, 2, 3]);
    println!("创建 Rc: {:?}", rc_data);
    
    // 下面的代码会编译错误
    // let handle = thread::spawn(move || {
    //     println!("尝试在子线程使用 Rc: {:?}", rc_data);
    // });
    
    println!("Arc 实现 Send(线程安全)");
    
    let arc_data = Arc::new(vec![10, 20, 30]);
    let mut handles = vec![];
    
    println!("创建 Arc 并在 3 个线程间共享:");
    for i in 0..3 {
        let data = Arc::clone(&arc_data);
        handles.push(thread::spawn(move || {
            thread::sleep(Duration::from_millis(i * 10));
            println!("线程 {}: 安全访问 {:?},引用计数: {}", 
                     i, data, Arc::strong_count(&data));
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

通过以上例子,我们可以看到,Send主要保证以下几点:

  1. 防止了非线程安全的操作跨线程执行
  2. 将运行时可能的并发错误转换为编译时错误
  3. 没有运行时开销,所有检查在编译时完成

Sync

为什么需要 Sync trait? 在多线程环境下我们面临另一个问题:当多个线程同时访问同一数据时,可能会发生什么? 首先,多线程同时读写同一内存,其次,通过不可变引用修改数据,最后,非原子操作导致中间状态被观察到。 综上,我们需要Sync去保证这些问题不会发生,以下我们用例子说明:

use std::thread;
use std::sync::{Arc, Mutex, RwLock};
use std::cell::{Cell, RefCell};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::collections::HashMap;

fn main() {
    println!("基本类型自动实现 Sync");
    
    let number = Arc::new(42);
    let message = Arc::new(String::from("共享消息"));
    let data = Arc::new(vec![1, 2, 3, 4, 5]);
    
    println!("创建共享数据(使用 Arc 包装)");
    println!("number: {}", *number);
    println!("message: {}", *message);
    println!("data: {:?}\n", *data);
    
    let mut handles = vec![];
    
    // 多个线程同时持有同一数据的不可变引用
    for i in 0..3 {
        let num = Arc::clone(&number);
        let msg = Arc::clone(&message);
        let vec = Arc::clone(&data);
        
        handles.push(thread::spawn(move || {
            println!("  线程 {} 读取:num={}, msg='{}', data={:?}", 
                     i, *num, *msg, *vec);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("多个线程安全地共享了不可变引用");
    
    println!("示例 2:Cell 和 RefCell 不实现 Sync");
    
    println!("危险场景(如果 Cell 是 Sync):");
    println!("线程 A: cell.set(10)  // 非原子写入");
    println!("线程 B: cell.set(20)  // 同时写入");
    println!("结果: 数据竞争,未定义行为");
    
    struct UnsafeShared {
        value: Cell<i32>,
    }
    
    // 下面的代码会编译错误,因为 Cell 不是 Sync
    // let shared = Arc::new(UnsafeShared { value: Cell::new(0) });
    // let shared_clone = Arc::clone(&shared);
    // thread::spawn(move || {
    //     shared_clone.value.set(10);  // 错误!Cell 不是 Sync
    // });
    
    println!("RefCell<T> 同样不是 Sync:");
    println!("运行时借用检查不是线程安全的");
    println!("borrow_mut() 的引用计数不是原子的");
    println!("多线程访问会破坏借用规则");
    
    println!("示例 3:Mutex 和 RwLock 实现 Sync\n");
    
    // Mutex 使非 Sync 类型变得可共享
    struct Counter {
        count: Cell<i32>,  // Cell 不是 Sync
    }
    
    // 但是 Mutex<Counter> 是 Sync!
    let safe_counter = Arc::new(Mutex::new(Counter {
        count: Cell::new(0),
    }));
    
    let mut handles = vec![];
    
    println!("使用 Mutex 包装非 Sync 类型:");
    for i in 0..5 {
        let counter = Arc::clone(&safe_counter);
        handles.push(thread::spawn(move || {
            let mut guard = counter.lock().unwrap();
            let old = guard.count.get();
            guard.count.set(old + 1);
            println!("  线程 {} 增加计数: {} → {}", i, old, old + 1);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    let final_count = safe_counter.lock().unwrap().count.get();
    println!("  最终计数: {}", final_count);
    
    // RwLock 示例
    println!("RwLock<T> 提供更细粒度的并发控制:");
    println!("多个读者可以同时访问,写者独占访问");
    
    let shared_data = Arc::new(RwLock::new(HashMap::new()));
    
    // 初始化数据
    {
        let mut map = shared_data.write().unwrap();
        map.insert("apple", 5);
        map.insert("banana", 3);
        println!("初始化数据: {:?}", *map);
    }
    
    let mut handles = vec![];
    
    // 多个读线程
    for i in 0..3 {
        let data = Arc::clone(&shared_data);
        handles.push(thread::spawn(move || {
            let map = data.read().unwrap();
            println!("  读线程 {}: apple={:?}, banana={:?}", 
                     i, map.get("apple"), map.get("banana"));
            thread::sleep(Duration::from_millis(50));
        }));
    }
    
    // 一个写线程
    let data = Arc::clone(&shared_data);
    handles.push(thread::spawn(move || {
        thread::sleep(Duration::from_millis(25));
        let mut map = data.write().unwrap();
        map.insert("orange", 7);
        println!("  写线程: 添加 orange=7");
    }));
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("  最终数据: {:?}", *shared_data.read().unwrap());
    
    println!("原始指针不是 Sync");
    
    println!("*const T 和 *mut T 不实现 Sync,因为:");
    println!("编译器无法验证指针的有效性");
    println!("无法保证指向的内存的线程安全性");
    println!("可能指向线程局部存储");
    
    struct RawPointerWrapper {
        ptr: *const i32,  // 原始指针不是 Sync
    }
    
    // unsafe impl Sync for RawPointerWrapper {}  // 需要手动实现(不安全)
    
    println!("如果需要跨线程共享指针,应该使用:");
    println!("Arc<T> - 安全的共享所有权");
    println!("&T - 如果生命周期允许");
    println!("AtomicPtr - 原子指针操作\n");
    
    println!("Sync 的传递性规则");
    
    // 所有字段都是 Sync → 自动实现 Sync
    #[derive(Debug)]
    struct SyncData {
        id: u32,                      // u32 是 Sync
        name: String,                 // String 是 Sync
        shared: Arc<Mutex<Vec<i32>>>, // Arc<Mutex<T>> 是 Sync
    }
    
    // 包含非 Sync 字段 → 不能实现 Sync
    struct NonSyncData {
        id: u32,
        cell: Cell<i32>,  // Cell 不是 Sync!
    }
    
    println!("Sync 的自动推导:");
    println!("SyncData: 所有字段都是 Sync");
    println!("NonSyncData: 包含 Cell (非 Sync)");
    
    let sync_data = Arc::new(SyncData {
        id: 1,
        name: String::from("可共享"),
        shared: Arc::new(Mutex::new(vec![1, 2, 3])),
    });
    
    let mut handles = vec![];
    for i in 0..3 {
        let data = Arc::clone(&sync_data);
        handles.push(thread::spawn(move || {
            println!("  线程 {} 访问 SyncData: id={}, name={}", 
                     i, data.id, data.name);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

通过以上例子,我们可以看到,Sync主要保证以下几点:

  1. Sync 确保多个线程可以安全地持有同一数据的不可变引用
  2. 提供了安全的内部可变性
  3. 不安全的共享在编译时就被阻止 所以,Sync和Send保证数据可以被多线程共享,那么我们在unsafe场景下该怎么做呢,见以下例子:
use std::thread;
use std::sync::{Arc, atomic::{AtomicUsize, AtomicPtr, Ordering}};
use std::ptr::NonNull;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::alloc::{alloc, dealloc, Layout};

fn main() {
    println!("示例 1:错误的 unsafe 实现(仅作说明,已注释");
    
    struct BadExample {
        // 包含非线程安全的内部可变性
        data: UnsafeCell<i32>,
    }
    
    // 错误!UnsafeCell 不应该直接实现 Sync
    // unsafe impl Sync for BadExample {}  // 永远不要这样做!
    
    println!("为什么这是错误的:");
    println!("1. UnsafeCell 提供内部可变性");
    println!("2. 没有任何同步机制");
    println!("3. 多线程访问会导致数据竞争");
    
    
    println!("示例 2:原始指针的安全包装");
    
    // 一个包含原始指针的类型
    struct SafeWrapper<T> {
        ptr: *mut T,
        _marker: PhantomData<T>,
    }
    
    impl<T> SafeWrapper<T> {
        fn new(value: T) -> Self {
            let boxed = Box::new(value);
            SafeWrapper {
                ptr: Box::into_raw(boxed),
                _marker: PhantomData,
            }
        }
        
        fn get(&self) -> &T {
            unsafe { &*self.ptr }
        }
    }
    
    impl<T> Drop for SafeWrapper<T> {
        fn drop(&mut self) {
            unsafe {
                let _ = Box::from_raw(self.ptr);
            }
        }
    }
    
    // 安全地实现 Send:
    // - ptr 指向堆上的数据,可以跨线程传递
    // - 我们拥有数据的所有权
    unsafe impl<T: Send> Send for SafeWrapper<T> {}
    
    // 安全地实现 Sync:
    // - 只提供不可变访问
    // - T 本身必须是 Sync
    unsafe impl<T: Sync> Sync for SafeWrapper<T> {}
    
    
    let wrapper = Arc::new(SafeWrapper::new(vec![1, 2, 3]));
    let mut handles = vec![];
    
    for i in 0..3 {
        let w = Arc::clone(&wrapper);
        handles.push(thread::spawn(move || {
            println!("  线程 {} 访问: {:?}", i, w.get());
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

从上面的例子我们可以知道,在unsafe场景下实现异步安全,我们应该做到:

  1. 仔细验证线程安全性
  2. 使用正确的内存顺序
  3. 文档说明安全保证
  4. 考虑使用现有的同步原语

那么,存不存在一种情况,有Send的时候没有Sync,有Sync的时候没有Send,在这些场景中,还能不能使用安全的多线程环境,如果能,那应该怎么做?如果不能,为什么?我们接着看:

有Send无Sync

我们来看这个例子:

use std::thread;
use std::sync::{Arc, Mutex};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::marker::PhantomData;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    example_send_not_sync();
}

fn example_send_not_sync() {
    println!("\n## 示例 1:有 Send 无 Sync - Cell<T> 和 RefCell<T>\n");
    
    // Cell 和 RefCell 实现 Send(当 T: Send)但不实现 Sync
    
    // Cell 是 Send
    println!("1️⃣ Cell<T> 可以移动到其他线程:");
    let cell = Cell::new(42);
    
    let handle = thread::spawn(move || {
        println!("  子线程接收 Cell,值: {}", cell.get());
        cell.set(100);
        println!("  子线程修改后: {}", cell.get());
    });
    handle.join().unwrap();
    
    // 2. Cell 不是 Sync
    
    // 下面的代码会编译错误,因为 Cell 不是 Sync
    // let shared_cell = Arc::new(Cell::new(0));
    // let c = Arc::clone(&shared_cell);
    // thread::spawn(move || {
    //     c.set(10);  // 错误:Cell 不是 Sync
    // });
    
    println!("解决方案 A:使用 Mutex 包装 Cell");
    let safe_cell = Arc::new(Mutex::new(Cell::new(0)));
    let mut handles = vec![];
    
    for i in 0..3 {
        let cell = Arc::clone(&safe_cell);
        handles.push(thread::spawn(move || {
            let cell_guard = cell.lock().unwrap();
            let old = cell_guard.get();
            cell_guard.set(old + 1);
            println!("  线程 {} 修改 Cell: {} -> {}", i, old, old + 1);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("  最终值: {}", safe_cell.lock().unwrap().get());
    
    println!("解决方案 B:通过通道传递 RefCell");
    
    #[derive(Debug)]
    struct Data {
        value: RefCell<Vec<i32>>,
    }
    
    impl Data {
        fn new() -> Self {
            Data {
                value: RefCell::new(vec![1, 2, 3]),
            }
        }
        
        fn process(&self) {
            let mut v = self.value.borrow_mut();
            v.push(v.len() as i32 + 1);
        }
    }
    
    let (tx, rx) = mpsc::channel();
    
    // 发送端:创建并发送数据
    thread::spawn(move || {
        let data = Data::new();
        println!("  发送线程创建数据: {:?}", data.value.borrow());
        data.process();
        println!("  发送线程处理后: {:?}", data.value.borrow());
        tx.send(data).unwrap();
    });
    
    // 接收端:接收并继续处理
    let data = rx.recv().unwrap();
    println!("  主线程接收数据: {:?}", data.value.borrow());
    data.process();
    println!("  主线程处理后: {:?}", data.value.borrow());
}

总结,Send 但非 Sync 的类型可以:

  1. 在线程间传递所有权(move)
  2. 使用 Mutex 实现安全共享
  3. 通过通道在线程间传递

有Sync无Send

我们看看这个例子:

use std::thread;
use std::sync::{Arc, Mutex};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::marker::PhantomData;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    example_sync_not_send();
}

fn example_sync_not_send() {
    println!("\n## 示例 2:有 Sync 无 Send - 特殊的标记类型\n");
    
    // 创建一个 Sync 但非 Send 的类型
    // 这种情况比较罕见,通常涉及线程局部存储或特定的系统资源
    
    println!("📌 创建一个 Sync 但非 Send 的类型:");
    println!("  • Send: ❌ 不能移动到其他线程");
    println!("  • Sync: ✅ 可以在线程间共享不可变引用");
    println!("  • 场景: 绑定到特定线程但允许其他线程读取\n");
    
    // 模拟一个绑定到创建线程的资源
    struct ThreadBoundResource {
        thread_id: std::thread::ThreadId,
        data: i32,
        // 使用 PhantomData 和 *const u8 使类型不是 Send
        _phantom: PhantomData<*const u8>,
    }
    
    impl ThreadBoundResource {
        fn new(data: i32) -> Self {
            ThreadBoundResource {
                thread_id: thread::current().id(),
                data,
                _phantom: PhantomData,
            }
        }
        
        fn read(&self) -> i32 {
            // 任何线程都可以读取
            self.data
        }
        
        fn write(&mut self, value: i32) {
            // 只有创建线程可以写入
            assert_eq!(
                self.thread_id,
                thread::current().id(),
                "只能在创建线程中修改!"
            );
            self.data = value;
        }
        
        fn get_thread_id(&self) -> std::thread::ThreadId {
            self.thread_id
        }
    }
    
    // 手动实现 Sync(允许共享不可变引用)
    // 安全因为 read() 方法不修改状态
    unsafe impl Sync for ThreadBoundResource {}
    // 不实现 Send(不允许移动到其他线程)
    // impl Send for ThreadBoundResource {}  // 不实现!
    
    println!("创建线程绑定资源:");
    let resource = Arc::new(ThreadBoundResource::new(100));
    println!("  主线程创建资源,值: {}", resource.read());
    println!("  绑定到线程: {:?}", resource.get_thread_id());
    
    // 2. 其他线程可以读取(通过 Arc 共享)
    println!("其他线程可以读取(Sync):");
    let mut handles = vec![];
    
    for i in 0..3 {
        let r = Arc::clone(&resource);
        handles.push(thread::spawn(move || {
            println!("  线程 {} 读取值: {}, 资源绑定于: {:?}", 
                     i, r.read(), r.get_thread_id());
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    // 3. 不能移动到其他线程
    
    // 下面的代码会编译错误,因为 ThreadBoundResource 不是 Send
    // let r = ThreadBoundResource::new(50);
    // thread::spawn(move || {
    //     r.write(60);  // 错误:ThreadBoundResource 不是 Send
    // });

    //实际应用:线程局部缓存系统
    
    struct ThreadLocalCache {
        creator_thread: std::thread::ThreadId,
        cache: RefCell<Vec<String>>,
        stats: Mutex<CacheStats>,
        _marker: PhantomData<*const u8>,  // 使其非 Send
    }
    
    #[derive(Default, Debug)]
    struct CacheStats {
        reads: usize,
        writes: usize,
    }
    
    impl ThreadLocalCache {
        fn new() -> Self {
            ThreadLocalCache {
                creator_thread: thread::current().id(),
                cache: RefCell::new(Vec::new()),
                stats: Mutex::new(CacheStats::default()),
                _marker: PhantomData,
            }
        }
        
        fn add(&self, item: String) {
            assert_eq!(
                self.creator_thread,
                thread::current().id(),
                "只能在创建线程中添加!"
            );
            self.cache.borrow_mut().push(item);
            self.stats.lock().unwrap().writes += 1;
        }
        
        fn get_stats(&self) -> CacheStats {
            let stats = self.stats.lock().unwrap();
            stats.reads += 1;
            CacheStats {
                reads: stats.reads,
                writes: stats.writes,
            }
        }
    }
    
    // 允许其他线程读取统计信息
    unsafe impl Sync for ThreadLocalCache {}
    
    let cache = Arc::new(ThreadLocalCache::new());
    
    // 主线程添加数据
    cache.add("item1".to_string());
    cache.add("item2".to_string());
    println!("\n  主线程添加了 2 个项目");
    
    // 其他线程读取统计
    let mut handles = vec![];
    for i in 0..2 {
        let c = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            let stats = c.get_stats();
            println!("  线程 {} 读取统计: {:?}", i, stats);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
}

总结,Sync 但非 Send 的类型可以:

  1. 在创建线程中修改
  2. 其他线程通过 Arc 读取
  3. 用于线程绑定的资源管理 在标准库中,MutexGuard 是 Sync 但非 Send,这防止了锁在错误的线程中释放

总结

Send和Sync是Rust并发编程安全性的两大基石,它们从不同维度保障了多线程环境下的内存安全。Send关注的是所有权转移——它保证一个类型可以安全地从一个线程移动到另一个线程;而 Sync 关注的是引用共享——它保证一个类型的不可变引用可以被多个线程同时持有。这两个 trait 的组合产生了四种模式:

  1. Send + Sync 的类型最为灵活,既可转移又可共享;
  2. Send + !Sync 的类型(如 Cell、RefCell)可以跨线程移动但不能共享,需要 Mutex 包装才能安全共享;
  3. !Send + Sync 的类型较为罕见,通常是绑定到特定线程的资源;
  4. 而 !Send + !Sync 的类型(如 Rc、原始指针)则完全不能跨线程使用。 理解这两个 trait 的关键在于记住一个简单原则:Send 管"移动",Sync 管"共享"。通过编译器的自动推导和类型系统的保证,Rust 将潜在的并发错误从运行时提前到了编译时,而 Arc、Mutex 等同步原语则提供了改变类型 Send/Sync 特性的能力。正是这套机制的存在,让 Rust 实现了"无畏并发"——在保证内存安全的同时,不牺牲性能。
上次编辑于:
贡献者: lexon