Rust随笔(八)
Rust随笔(八)
今天来谈谈Rust里面的Sync和Send,在随笔七里面我们提到这两个概念,今天我们来详细的说一下。这两个trait是rust异步编程的主要基石之一。我们首先看看定义:
- Send
Send表示一个类型的所有权可以安全地在线程之间转移。 - Sync
Sync表示一个类型的不可变引用(&T)可以安全地在线程之间共享。 只有同时实现这两者,才能在多线程环境下共享数据,看着定义我们会感到有点疑惑,接下来我们用例子说明。
Send
为什么需要 Send trait? 在多线程环境下我们面临一个问题:当我们把数据从一个线程移动到另一个线程时,可能会发生什么? 首先,数据移动会引用计数的并发修改,如果使用非原子操作,多线程会导致计数错误;其次,多线程访问数据会导致内部状态的共享,导致多个线程访问同一内存。最后,某些类型依赖于特定线程的上下文,转移到其他线程会破坏这种依赖,导致内存被错误地释放或泄漏。 综上,我们需要Send去保证这些问题不会发生,以下我们用例子说明:
use std::thread;
use std::sync::{Arc, Mutex};
use std::rc::Rc;
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
fn main() {
println!("基本类型自动实现 Send");
let number = 42;
let message = String::from("Hello from thread!");
let data = vec![1, 2, 3, 4, 5];
println!("主线程创建数据:number={}, message=\"{}\", vec={:?}",
number, message, data);
// 这些类型都实现了 Send,可以安全转移所有权
let handle = thread::spawn(move || {
println!("子线程接收数据:");
println!("number = {}", number);
println!("message = {}", message);
println!("data = {:?}", data);
println!("所有权成功转移,原线程不再能访问这些数据");
});
handle.join().unwrap();
// println!("{}", message); // 错误,所有权已经转移
println!("Rc 不实现 Send(非线程安全)");
println!("Rc 内部使用非原子计数");
println!("如果两个线程同时克隆 Rc");
println!("线程 A: 读取 count=2 → 线程 B: 读取 count=2");
println!("线程 A: 设置 count=3 → 线程 B: 设置 count=3");
println!("结果:计数应该是 4,实际是 3!内存泄漏或重复释放!");
let rc_data = Rc::new(vec![1, 2, 3]);
println!("创建 Rc: {:?}", rc_data);
// 下面的代码会编译错误
// let handle = thread::spawn(move || {
// println!("尝试在子线程使用 Rc: {:?}", rc_data);
// });
println!("Arc 实现 Send(线程安全)");
let arc_data = Arc::new(vec![10, 20, 30]);
let mut handles = vec![];
println!("创建 Arc 并在 3 个线程间共享:");
for i in 0..3 {
let data = Arc::clone(&arc_data);
handles.push(thread::spawn(move || {
thread::sleep(Duration::from_millis(i * 10));
println!("线程 {}: 安全访问 {:?},引用计数: {}",
i, data, Arc::strong_count(&data));
}));
}
for handle in handles {
handle.join().unwrap();
}
}
通过以上例子,我们可以看到,Send主要保证以下几点:
- 防止了非线程安全的操作跨线程执行
- 将运行时可能的并发错误转换为编译时错误
- 没有运行时开销,所有检查在编译时完成
Sync
为什么需要 Sync trait? 在多线程环境下我们面临另一个问题:当多个线程同时访问同一数据时,可能会发生什么? 首先,多线程同时读写同一内存,其次,通过不可变引用修改数据,最后,非原子操作导致中间状态被观察到。 综上,我们需要Sync去保证这些问题不会发生,以下我们用例子说明:
use std::thread;
use std::sync::{Arc, Mutex, RwLock};
use std::cell::{Cell, RefCell};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::collections::HashMap;
fn main() {
println!("基本类型自动实现 Sync");
let number = Arc::new(42);
let message = Arc::new(String::from("共享消息"));
let data = Arc::new(vec![1, 2, 3, 4, 5]);
println!("创建共享数据(使用 Arc 包装)");
println!("number: {}", *number);
println!("message: {}", *message);
println!("data: {:?}\n", *data);
let mut handles = vec![];
// 多个线程同时持有同一数据的不可变引用
for i in 0..3 {
let num = Arc::clone(&number);
let msg = Arc::clone(&message);
let vec = Arc::clone(&data);
handles.push(thread::spawn(move || {
println!(" 线程 {} 读取:num={}, msg='{}', data={:?}",
i, *num, *msg, *vec);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("多个线程安全地共享了不可变引用");
println!("示例 2:Cell 和 RefCell 不实现 Sync");
println!("危险场景(如果 Cell 是 Sync):");
println!("线程 A: cell.set(10) // 非原子写入");
println!("线程 B: cell.set(20) // 同时写入");
println!("结果: 数据竞争,未定义行为");
struct UnsafeShared {
value: Cell<i32>,
}
// 下面的代码会编译错误,因为 Cell 不是 Sync
// let shared = Arc::new(UnsafeShared { value: Cell::new(0) });
// let shared_clone = Arc::clone(&shared);
// thread::spawn(move || {
// shared_clone.value.set(10); // 错误!Cell 不是 Sync
// });
println!("RefCell<T> 同样不是 Sync:");
println!("运行时借用检查不是线程安全的");
println!("borrow_mut() 的引用计数不是原子的");
println!("多线程访问会破坏借用规则");
println!("示例 3:Mutex 和 RwLock 实现 Sync\n");
// Mutex 使非 Sync 类型变得可共享
struct Counter {
count: Cell<i32>, // Cell 不是 Sync
}
// 但是 Mutex<Counter> 是 Sync!
let safe_counter = Arc::new(Mutex::new(Counter {
count: Cell::new(0),
}));
let mut handles = vec![];
println!("使用 Mutex 包装非 Sync 类型:");
for i in 0..5 {
let counter = Arc::clone(&safe_counter);
handles.push(thread::spawn(move || {
let mut guard = counter.lock().unwrap();
let old = guard.count.get();
guard.count.set(old + 1);
println!(" 线程 {} 增加计数: {} → {}", i, old, old + 1);
}));
}
for handle in handles {
handle.join().unwrap();
}
let final_count = safe_counter.lock().unwrap().count.get();
println!(" 最终计数: {}", final_count);
// RwLock 示例
println!("RwLock<T> 提供更细粒度的并发控制:");
println!("多个读者可以同时访问,写者独占访问");
let shared_data = Arc::new(RwLock::new(HashMap::new()));
// 初始化数据
{
let mut map = shared_data.write().unwrap();
map.insert("apple", 5);
map.insert("banana", 3);
println!("初始化数据: {:?}", *map);
}
let mut handles = vec![];
// 多个读线程
for i in 0..3 {
let data = Arc::clone(&shared_data);
handles.push(thread::spawn(move || {
let map = data.read().unwrap();
println!(" 读线程 {}: apple={:?}, banana={:?}",
i, map.get("apple"), map.get("banana"));
thread::sleep(Duration::from_millis(50));
}));
}
// 一个写线程
let data = Arc::clone(&shared_data);
handles.push(thread::spawn(move || {
thread::sleep(Duration::from_millis(25));
let mut map = data.write().unwrap();
map.insert("orange", 7);
println!(" 写线程: 添加 orange=7");
}));
for handle in handles {
handle.join().unwrap();
}
println!(" 最终数据: {:?}", *shared_data.read().unwrap());
println!("原始指针不是 Sync");
println!("*const T 和 *mut T 不实现 Sync,因为:");
println!("编译器无法验证指针的有效性");
println!("无法保证指向的内存的线程安全性");
println!("可能指向线程局部存储");
struct RawPointerWrapper {
ptr: *const i32, // 原始指针不是 Sync
}
// unsafe impl Sync for RawPointerWrapper {} // 需要手动实现(不安全)
println!("如果需要跨线程共享指针,应该使用:");
println!("Arc<T> - 安全的共享所有权");
println!("&T - 如果生命周期允许");
println!("AtomicPtr - 原子指针操作\n");
println!("Sync 的传递性规则");
// 所有字段都是 Sync → 自动实现 Sync
#[derive(Debug)]
struct SyncData {
id: u32, // u32 是 Sync
name: String, // String 是 Sync
shared: Arc<Mutex<Vec<i32>>>, // Arc<Mutex<T>> 是 Sync
}
// 包含非 Sync 字段 → 不能实现 Sync
struct NonSyncData {
id: u32,
cell: Cell<i32>, // Cell 不是 Sync!
}
println!("Sync 的自动推导:");
println!("SyncData: 所有字段都是 Sync");
println!("NonSyncData: 包含 Cell (非 Sync)");
let sync_data = Arc::new(SyncData {
id: 1,
name: String::from("可共享"),
shared: Arc::new(Mutex::new(vec![1, 2, 3])),
});
let mut handles = vec![];
for i in 0..3 {
let data = Arc::clone(&sync_data);
handles.push(thread::spawn(move || {
println!(" 线程 {} 访问 SyncData: id={}, name={}",
i, data.id, data.name);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
通过以上例子,我们可以看到,Sync主要保证以下几点:
- Sync 确保多个线程可以安全地持有同一数据的不可变引用
- 提供了安全的内部可变性
- 不安全的共享在编译时就被阻止 所以,Sync和Send保证数据可以被多线程共享,那么我们在unsafe场景下该怎么做呢,见以下例子:
use std::thread;
use std::sync::{Arc, atomic::{AtomicUsize, AtomicPtr, Ordering}};
use std::ptr::NonNull;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::alloc::{alloc, dealloc, Layout};
fn main() {
println!("示例 1:错误的 unsafe 实现(仅作说明,已注释");
struct BadExample {
// 包含非线程安全的内部可变性
data: UnsafeCell<i32>,
}
// 错误!UnsafeCell 不应该直接实现 Sync
// unsafe impl Sync for BadExample {} // 永远不要这样做!
println!("为什么这是错误的:");
println!("1. UnsafeCell 提供内部可变性");
println!("2. 没有任何同步机制");
println!("3. 多线程访问会导致数据竞争");
println!("示例 2:原始指针的安全包装");
// 一个包含原始指针的类型
struct SafeWrapper<T> {
ptr: *mut T,
_marker: PhantomData<T>,
}
impl<T> SafeWrapper<T> {
fn new(value: T) -> Self {
let boxed = Box::new(value);
SafeWrapper {
ptr: Box::into_raw(boxed),
_marker: PhantomData,
}
}
fn get(&self) -> &T {
unsafe { &*self.ptr }
}
}
impl<T> Drop for SafeWrapper<T> {
fn drop(&mut self) {
unsafe {
let _ = Box::from_raw(self.ptr);
}
}
}
// 安全地实现 Send:
// - ptr 指向堆上的数据,可以跨线程传递
// - 我们拥有数据的所有权
unsafe impl<T: Send> Send for SafeWrapper<T> {}
// 安全地实现 Sync:
// - 只提供不可变访问
// - T 本身必须是 Sync
unsafe impl<T: Sync> Sync for SafeWrapper<T> {}
let wrapper = Arc::new(SafeWrapper::new(vec![1, 2, 3]));
let mut handles = vec![];
for i in 0..3 {
let w = Arc::clone(&wrapper);
handles.push(thread::spawn(move || {
println!(" 线程 {} 访问: {:?}", i, w.get());
}));
}
for handle in handles {
handle.join().unwrap();
}
}
从上面的例子我们可以知道,在unsafe场景下实现异步安全,我们应该做到:
- 仔细验证线程安全性
- 使用正确的内存顺序
- 文档说明安全保证
- 考虑使用现有的同步原语
那么,存不存在一种情况,有Send的时候没有Sync,有Sync的时候没有Send,在这些场景中,还能不能使用安全的多线程环境,如果能,那应该怎么做?如果不能,为什么?我们接着看:
有Send无Sync
我们来看这个例子:
use std::thread;
use std::sync::{Arc, Mutex};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::marker::PhantomData;
use std::time::Duration;
use std::sync::mpsc;
fn main() {
example_send_not_sync();
}
fn example_send_not_sync() {
println!("\n## 示例 1:有 Send 无 Sync - Cell<T> 和 RefCell<T>\n");
// Cell 和 RefCell 实现 Send(当 T: Send)但不实现 Sync
// Cell 是 Send
println!("1️⃣ Cell<T> 可以移动到其他线程:");
let cell = Cell::new(42);
let handle = thread::spawn(move || {
println!(" 子线程接收 Cell,值: {}", cell.get());
cell.set(100);
println!(" 子线程修改后: {}", cell.get());
});
handle.join().unwrap();
// 2. Cell 不是 Sync
// 下面的代码会编译错误,因为 Cell 不是 Sync
// let shared_cell = Arc::new(Cell::new(0));
// let c = Arc::clone(&shared_cell);
// thread::spawn(move || {
// c.set(10); // 错误:Cell 不是 Sync
// });
println!("解决方案 A:使用 Mutex 包装 Cell");
let safe_cell = Arc::new(Mutex::new(Cell::new(0)));
let mut handles = vec![];
for i in 0..3 {
let cell = Arc::clone(&safe_cell);
handles.push(thread::spawn(move || {
let cell_guard = cell.lock().unwrap();
let old = cell_guard.get();
cell_guard.set(old + 1);
println!(" 线程 {} 修改 Cell: {} -> {}", i, old, old + 1);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!(" 最终值: {}", safe_cell.lock().unwrap().get());
println!("解决方案 B:通过通道传递 RefCell");
#[derive(Debug)]
struct Data {
value: RefCell<Vec<i32>>,
}
impl Data {
fn new() -> Self {
Data {
value: RefCell::new(vec![1, 2, 3]),
}
}
fn process(&self) {
let mut v = self.value.borrow_mut();
v.push(v.len() as i32 + 1);
}
}
let (tx, rx) = mpsc::channel();
// 发送端:创建并发送数据
thread::spawn(move || {
let data = Data::new();
println!(" 发送线程创建数据: {:?}", data.value.borrow());
data.process();
println!(" 发送线程处理后: {:?}", data.value.borrow());
tx.send(data).unwrap();
});
// 接收端:接收并继续处理
let data = rx.recv().unwrap();
println!(" 主线程接收数据: {:?}", data.value.borrow());
data.process();
println!(" 主线程处理后: {:?}", data.value.borrow());
}
总结,Send 但非 Sync 的类型可以:
- 在线程间传递所有权(move)
- 使用 Mutex 实现安全共享
- 通过通道在线程间传递
有Sync无Send
我们看看这个例子:
use std::thread;
use std::sync::{Arc, Mutex};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::marker::PhantomData;
use std::time::Duration;
use std::sync::mpsc;
fn main() {
example_sync_not_send();
}
fn example_sync_not_send() {
println!("\n## 示例 2:有 Sync 无 Send - 特殊的标记类型\n");
// 创建一个 Sync 但非 Send 的类型
// 这种情况比较罕见,通常涉及线程局部存储或特定的系统资源
println!("📌 创建一个 Sync 但非 Send 的类型:");
println!(" • Send: ❌ 不能移动到其他线程");
println!(" • Sync: ✅ 可以在线程间共享不可变引用");
println!(" • 场景: 绑定到特定线程但允许其他线程读取\n");
// 模拟一个绑定到创建线程的资源
struct ThreadBoundResource {
thread_id: std::thread::ThreadId,
data: i32,
// 使用 PhantomData 和 *const u8 使类型不是 Send
_phantom: PhantomData<*const u8>,
}
impl ThreadBoundResource {
fn new(data: i32) -> Self {
ThreadBoundResource {
thread_id: thread::current().id(),
data,
_phantom: PhantomData,
}
}
fn read(&self) -> i32 {
// 任何线程都可以读取
self.data
}
fn write(&mut self, value: i32) {
// 只有创建线程可以写入
assert_eq!(
self.thread_id,
thread::current().id(),
"只能在创建线程中修改!"
);
self.data = value;
}
fn get_thread_id(&self) -> std::thread::ThreadId {
self.thread_id
}
}
// 手动实现 Sync(允许共享不可变引用)
// 安全因为 read() 方法不修改状态
unsafe impl Sync for ThreadBoundResource {}
// 不实现 Send(不允许移动到其他线程)
// impl Send for ThreadBoundResource {} // 不实现!
println!("创建线程绑定资源:");
let resource = Arc::new(ThreadBoundResource::new(100));
println!(" 主线程创建资源,值: {}", resource.read());
println!(" 绑定到线程: {:?}", resource.get_thread_id());
// 2. 其他线程可以读取(通过 Arc 共享)
println!("其他线程可以读取(Sync):");
let mut handles = vec![];
for i in 0..3 {
let r = Arc::clone(&resource);
handles.push(thread::spawn(move || {
println!(" 线程 {} 读取值: {}, 资源绑定于: {:?}",
i, r.read(), r.get_thread_id());
}));
}
for handle in handles {
handle.join().unwrap();
}
// 3. 不能移动到其他线程
// 下面的代码会编译错误,因为 ThreadBoundResource 不是 Send
// let r = ThreadBoundResource::new(50);
// thread::spawn(move || {
// r.write(60); // 错误:ThreadBoundResource 不是 Send
// });
//实际应用:线程局部缓存系统
struct ThreadLocalCache {
creator_thread: std::thread::ThreadId,
cache: RefCell<Vec<String>>,
stats: Mutex<CacheStats>,
_marker: PhantomData<*const u8>, // 使其非 Send
}
#[derive(Default, Debug)]
struct CacheStats {
reads: usize,
writes: usize,
}
impl ThreadLocalCache {
fn new() -> Self {
ThreadLocalCache {
creator_thread: thread::current().id(),
cache: RefCell::new(Vec::new()),
stats: Mutex::new(CacheStats::default()),
_marker: PhantomData,
}
}
fn add(&self, item: String) {
assert_eq!(
self.creator_thread,
thread::current().id(),
"只能在创建线程中添加!"
);
self.cache.borrow_mut().push(item);
self.stats.lock().unwrap().writes += 1;
}
fn get_stats(&self) -> CacheStats {
let stats = self.stats.lock().unwrap();
stats.reads += 1;
CacheStats {
reads: stats.reads,
writes: stats.writes,
}
}
}
// 允许其他线程读取统计信息
unsafe impl Sync for ThreadLocalCache {}
let cache = Arc::new(ThreadLocalCache::new());
// 主线程添加数据
cache.add("item1".to_string());
cache.add("item2".to_string());
println!("\n 主线程添加了 2 个项目");
// 其他线程读取统计
let mut handles = vec![];
for i in 0..2 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
let stats = c.get_stats();
println!(" 线程 {} 读取统计: {:?}", i, stats);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
总结,Sync 但非 Send 的类型可以:
- 在创建线程中修改
- 其他线程通过 Arc 读取
- 用于线程绑定的资源管理 在标准库中,MutexGuard 是 Sync 但非 Send,这防止了锁在错误的线程中释放
总结
Send和Sync是Rust并发编程安全性的两大基石,它们从不同维度保障了多线程环境下的内存安全。Send关注的是所有权转移——它保证一个类型可以安全地从一个线程移动到另一个线程;而 Sync 关注的是引用共享——它保证一个类型的不可变引用可以被多个线程同时持有。这两个 trait 的组合产生了四种模式:
- Send + Sync 的类型最为灵活,既可转移又可共享;
- Send + !Sync 的类型(如 Cell、RefCell)可以跨线程移动但不能共享,需要 Mutex 包装才能安全共享;
- !Send + Sync 的类型较为罕见,通常是绑定到特定线程的资源;
- 而 !Send + !Sync 的类型(如 Rc、原始指针)则完全不能跨线程使用。 理解这两个 trait 的关键在于记住一个简单原则:Send 管"移动",Sync 管"共享"。通过编译器的自动推导和类型系统的保证,Rust 将潜在的并发错误从运行时提前到了编译时,而 Arc、Mutex 等同步原语则提供了改变类型 Send/Sync 特性的能力。正是这套机制的存在,让 Rust 实现了"无畏并发"——在保证内存安全的同时,不牺牲性能。