理解 Rust 异步编程:Future、Pin、Waker 与 Poll 的本质
理解Rust异步编程:从Send/Sync到Future、Waker、Poll
我们在随笔七和随笔八中提到过,异步编程的本质就是在多个任务共享一个时间段,并且我们还介绍了什么是Send&Sync,并表明了其在rust编程中的地位,那么今天,我们进一步深入Rust异步编程,了解Rust异步编程靠的是什么维持,需要什么工具去支撑,在Send和Sync的基础上,如何保证并发安全? 这是一个新系列,并打算开展3篇内容介绍,以下是系列大纲:
- 《理解 Rust 异步编程:从Send/Sync到Future、Waker、Poll》着重介绍Rust异步编程的基本工具和基本的类型
- 《理解 Rust 异步编程:深入Tokio,理解Rust异步运行时的灵魂》着重介绍Rust异步编程中的异步runtime,并且深入研究Tokio的基本原理
- 《理解 Rust 异步编程:构建一个高并发事件系统,从 Tokio 基础到完整限速框架》在最后一篇我们用上面两篇的知识构建一个高性能应用 通过这个系列,我们将从语言抽象到运行时实现,逐步理解 Rust 异步的安全性来源、性能边界,以及它在“高并发”场景下的真实力量。 那么,在进入 runtime 之前,我们首先得问:
async函数到底是什么?- 为什么它返回的是
Future? Pin到底在保护什么?Waker是怎么让任务“醒过来”的? 第一篇,我们将回答这些问题——从Future开始,揭开 Rust 异步模型的底层逻辑。
什么是Future
我们在进行异步编程的时候,总会出现一个高频的关键字——async,我们来看这个简单的例子
async fn foo(a:i32,b:i32) -> i32{
a + b
}
#[tokio::main]
async fn main(){
let a = foo(1,2);
println!("{}",a.await);
}当我们把await去掉,我们在编译器提示中会看到这么一个东西(省略了一些编译器生成的细节):
fut 的类型: impl core::Future::Future<Output = i32>也就是说,这个函数并没有直接返回 i32,而是返回了一个实现了 Future trait 的匿名类型。如果我们用 cargo expand 展开 async fn 的实际生成代码,会看到类似这样的结构(伪代码):
fn foo(a: i32, b: i32) -> impl Future<Output = i32> {
async move {
a + b
}
}或者更进一步地说,它其实对应这样一个状态机:
use std::Future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct FooFuture {
a: i32,
b: i32,
state: State,
}
enum State {
Start,
Done,
}
impl Future for FooFuture {
type Output = i32;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state {
State::Start => {
self.state = State::Done;
Poll::Ready(self.a + self.b)
}
State::Done => panic!("polled after completion"),
}
}
}也就是说,当你写下 async fn 时,Rust 编译器并不会创建协程或线程, 而是在编译期生成一个实现了 Future 的状态机类型。
每个
await都是一个状态分支;
每次poll都是状态机的一次推进;
当状态抵达Ready,任务才算真正完成。
那么Future到底是什么呢,我们来看源码:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}他是一个trait,有一个定义好的Output和一个poll函数签名组成。从字面意义上面看,这个trait简单的很,那么实际上它起到什么作用呢?我们先按下不表,我们先看看这里定义到的两个东西Pin和Poll<T>。
Pin?图钉?
我们在Future看到一个这么一个东西Pin,从字面意义上看就是一个图钉,那么在实际上有什么作用呢?我们先看看他的源码:
pub struct Pin<Ptr> {
pub __pointer: Ptr,
}从定义上看,Pin似乎只是把一个指针包了一层壳,看起来“平平无奇”,但它背后解决的是一个极其重要的安全问题——如何在异步状态机中保证引用不会因为内存移动而失效。
为什么需要Pin
要理解 Pin,我们得先看 Rust 的另一个“默认假设”:Rust 中的所有值默认都可以安全地移动(Move语义)这在普通代码中没有问题:
struct Point {
x: i32,
y: i32
}
fn main() {
let mut p = Point { x: 1, y: 2 };
let q = p;
// 移动所有权,没问题
println!("{}", q.x);
}但一旦进入异步函数或生成器的世界,这就出问题了。
因为编译器会把 async fn 编译成一个状态机,而状态机里可能保存了对自身局部变量的引用,在Future里面就体现了这一点:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;如果这个Future在执行过程中被移动(例如在不同线程的任务队列之间调度),那结构体里的引用self可能就指向了一块被移动过的内存——直接悬空引用。 为了解决这个问题,Rust引入了Pin。所以一旦某个值被Pin包裹,就保证它在内存中不会再被移动。所以我们会看这个函数签名:
self: Pin<&mut Self>这一部分意味着:
- 每次
poll时,runtime 只能通过一个固定地址去访问这个 Future; - 即使它内部保存着自引用(如
&x),也不会因为被移动而失效; - 这样编译器就能在类型层面保证异步状态机的内存安全。
Unpin
那是不是所有被Pin包裹住的类型都不能移动了?当然不是,有一些类型比如:i64、i32、string等等他是可以安全移动的(因为没有自引用^1)。所以Rust定义了一个辅助traitUnpin。这些安全移动的类型都实现了Unpin,只有那些存在自引用、或依赖固定内存地址的类型(例如Future、Generator)才会是 !Unpin,其它普通类型默认都是 Unpin。举个例子:
use std::pin::Pin;
fn main() {
let mut x = 10;
let mut pinned = Pin::new(&mut x);
*pinned.as_mut() = 20; // ✅ OK
println!("x = {}", x);
}i32 默认实现了 Unpin,所以 Pin<&mut i32> 基本等于 &mut i32,Pin 在这里不会阻止任何移动操作。而对那些没有实现 Unpin 的类型来说,Pin 的限制就会生效。 最典型的例子就是我们前面提到的 Future 类型:
use std::Future::Future;
async fn example() {
let x = 5;
println!("{}", x);
}
fn main() {
let fut = example();
// fut 的类型是 impl Future<Output = ()>
// 它默认是 !Unpin(不能安全移动)
}Future 通常会在内部保存对自身的引用,例如编译器在生成状态机时可能会创建像这样的结构:
struct ExampleFuture<'a> {
x: i32,
state: State<'a>, // 里面可能包含 &self.x
}这样就出现了自引用问题:如果这个 Future 被移动,它内部保存的&self.x就可能变成悬空引用。因此,编译器会自动将这种类型标记为 !Unpin,表示:“我必须被固定(pinned)在某个地址上,否则会出错”。总的来说可以这样理解它们的关系:
Pin是一种约束机制 —— 它定义了“哪些值不能被移动”;Unpin是一种豁免机制 —— 它让安全类型不受这个约束影响。 这两者组合起来,既能保证像 Future 这样的复杂自引用状态机不会被随意移动,又不会对普通类型施加不必要的限制。
Poll<T>,问问这个好了嘛?
我们已经解决了“Future 的位置不能动”这个问题。但即使 Future 被安全地固定在原地,它仍然不会自动执行。那么是谁让它“动起来”的呢?这就要说到 Poll<T>,Rust 异步系统中,任务推进的关键机制。
Poll的定义
Poll 是一个非常简单的枚举,定义在标准库的 std::task 模块中:
pub enum Poll<T> {
Ready(T),
Pending,
}它只有两种状态:第一种Ready(T)表示任务已经处理好了,并提交了结果,第二种Pending则是任务还没完成。换句话说:Poll 是 Future 和 Runtime 之间的通信信号。它告诉 runtime:“我现在要么完成了,要么还没准备好,请稍后再 Poll 我。”那这里的runtime是什么呢,我们会在下一章提到这个东西,可以暂时理解为推进任务执行的工具人。所有异步任务都要由他去推进但是不会真的执行,相当于一个“中层领导”。否则,若没有外部事件或唤醒机制,Poll永远是Pending状态。
poll() 的协作模型
我们回到原本的Future,我们看到 poll() 的签名是:
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Self::Output>;这里的逻辑关系非常重要:
| 参数 | 作用 |
|---|---|
self: Pin<&mut Self> | 被固定在内存中的 Future(状态机) |
cx: &mut Context<'_> | 当前执行上下文,里面包含一个 Waker |
返回值:Poll<T> | 任务是否完成或仍在等待 |
| 这意味着每次 poll 调用都是一个“尝试执行一次”的操作: |
- Runtime 调用
poll(); - 如果 Future 内部任务已经完成,返回
Poll::Ready(val); - 否则返回
Poll::Pending,并通过Context注册一个唤醒器(Waker); - 当外部事件(I/O 完成、计时器触发等)发生时,
Waker会通知 runtime; - runtime 再次调用
poll(),任务继续向前推进。 循环往复,直到 Future 最终返回Ready。
poll最小示例
我们来先写一个最小示例:
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::Future::Future;
use std::task::{Context, RawWaker, RawWakerVTable, Waker};
struct Counter {
count: u8,
}
impl Future for Counter {
type Output = u8;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
println!("Start counting...");
self.count += 1;
Poll::Pending
} else if self.count < 3 {
println!("Counting... {}", self.count);
self.count += 1;
Poll::Pending
} else {
println!("Done!");
Poll::Ready(self.count)
}
}
}
fn dummy_waker() -> Waker {
// 这些函数实现 RawWaker 需要的四个操作
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
// 这里的 vtable 必须是静态引用,否则生命周期不对
RawWaker::new(std::ptr::null(), &VTABLE)
}
// 定义静态的 vtable,包含四个函数指针
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
// 用 RawWaker 构造安全的 Waker unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
}
fn main() {
let mut fut = Counter { count: 0 };
let waker = dummy_waker();
let mut ctx = Context::from_waker(&waker);
let mut pinned = Pin::new(&mut fut);
loop {
match pinned.as_mut().poll(&mut ctx) {
Poll::Pending => continue,
Poll::Ready(v) => {
println!("Final value: {}", v);
break;
}
}
}
}这个 Future 每次被 poll 一次,就让 count +1:第一次、第二次返回 Pending,第三次返回 Ready。同时我们创建了一个函数模拟了runtime 反复poll的行为,输出结果类似:
Start counting...
Counting... 1
Counting... 2
Done!
Final value: 3这就是最小的“手工 runtime”,它不断调用 poll(),直到 Future 返回 Ready。
总结,问问我到底好没好
Poll 是 Rust 异步系统的心跳信号,每次调用 poll,状态机就往前走一步。Runtime 不会帮你自动执行 Future,它只是不断地“询问”它是否准备好了。到这里,我们终于可以看清楚整个链条:
Pin:保证 Future 在内存中不动;Unpin:告诉编译器哪些类型可以移动;Poll:让固定好的 Future 一步步被执行;Waker:负责唤醒等待中的任务。 现在我们知道了如何推进 Future,但还缺少一个关键问题:当它暂时无法推进时,谁来通知它再次被执行?
这就引出了 Rust 异步体系中的第四个核心角色:Waker。
Waker:快醒醒,这个任务要做完了
我们知道,poll() 并不是一个持续执行的循环,而是协作式的。Runtime 每次调用 poll(),Future 要么完成、要么返回 Pending。但问题是:Runtime怎么知道什么时候该“再poll一次”?,它总不能一秒钟 poll 一百万次去碰碰运气吧?这时候,Waker 就登场了。
Waker的职责
Waker 是 Future 和 runtime 之间的唤醒机制。它的作用是让 Future 在返回 Poll::Pending 时,告诉 runtime:当条件满足时,请再来 poll 我一次。Waker在标准库中的定义:
pub struct Waker {
waker: RawWaker,
}你可以把它理解成一个函数指针集合,里面定义了 runtime 在“唤醒任务”时应该做什么。 每个 runtime(Tokio、async-std、smol)都会根据自己的任务系统,实现不同的RawWakerVTable。
“Waker”的本质:注册与回调
每当 poll 返回 Pending,Future 通常会做两件事:
- 保存 waker 的克隆,以便稍后使用;
- 当条件满足时(例如 socket 可读、定时器到期),调用:
cx.waker().wake_by_ref();我们前面在 Poll<T> 小节写过一个最小的示例 Counter —— 它在每次 poll() 时自己往前走一步,现在我们给它“装上闹钟”机制,让它能够通过 Waker 自我唤醒,而不是靠外部循环不停地poll。在前面,我们这样定义了 Counter:
struct Counter {
count: u8,
}
impl Future for Counter {
type Output = u8;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
println!("Start counting...");
self.count += 1;
Poll::Pending
} else if self.count < 3 {
println!("Counting... {}", self.count);
self.count += 1;
Poll::Pending
} else {
println!("Done!");
Poll::Ready(self.count)
}
}
}这段代码需要外部循环不停地 poll() 才能前进,我们现在要做的改造是让它自己保存 waker,并在合适的时机唤醒自己:
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::Future::Future;
struct Counter {
count: u8,
waker: Option<Waker>,
}
impl Future for Counter {
type Output = u8;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
println!("Start counting...");
// 保存 waker,以便之后唤醒自己
self.waker = Some(cx.waker().clone());
self.count += 1;
Poll::Pending
} else if self.count < 3 {
println!("Counting... {}", self.count);
self.count += 1;
Poll::Pending
} else {
println!("Done!");
Poll::Ready(self.count)
}
}
}
impl Counter {
fn wake(&mut self) {
if let Some(w) = self.waker.take() {
println!("(wake) Counter manually triggered");
w.wake();
}
}
}我们可以使用之前的 dummy_waker() 来模拟 runtime:
fn dummy_waker() -> Waker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
RawWaker::new(std::ptr::null(), &VTABLE)
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
}
fn main() {
let mut fut = Counter { count: 0, waker: None };
let waker = dummy_waker();
let mut ctx = Context::from_waker(&waker);
let mut pinned = Pin::new(&mut fut);
// 第一次 poll,注册 waker
let _ = pinned.as_mut().poll(&mut ctx);
// 模拟外部事件发生,调用 wake() 唤醒任务
for _ in 0..3 {
std::thread::sleep(std::time::Duration::from_millis(300));
pinned.as_mut().wake();
let _ = pinned.as_mut().poll(&mut ctx);
}
}执行之后的结果大概是这样:
Start counting...
(wake) Counter manually triggered
Counting... 1
Counting... 2
Done!为什么不是反复waker呢,因为我们这里极大的简化了模型,注册的 waker(cx.waker().clone())是一个“假 waker”,它不会真的触发事件;所以即使手动调用了 wake(),它也没有让任何 runtime 去“再次 poll”。在 Tokio / async-std 这种真正的 runtime 中:cx.waker().clone()返回的是 runtime自己实现的 Waker,waker.wake() 会把任务重新放入任务队列中,Runtime 主循环(Reactor + Scheduler)检测到任务被唤醒后,再次调用 poll()。
总结:唤醒执行
我们到这里可以看到,Waker 本身并不会执行任何任务——它只是一个信号机制。wake() 的作用是“把任务重新丢回调度队列”,告诉 runtime:“我准备好了,请在下一轮循环里再 poll 我一次。” 而真正的执行,仍然由 runtime 主循环完成:
- Future 返回
Pending→ 注册一个Waker; - 外部事件到来 → 调用
wake(); - Runtime 收到信号 → 把该任务放回待执行队列;
- 下一轮调度 → runtime 再次调用
poll(); - Future 前进或完成 → 直到最终返回
Ready。 在我们的简化模型中,这个“事件通知链”都被完成了:wake()打印日志,而poll()是自己循环调用的。但在 Tokio、async-std 或 smol 这样的真正 runtime 中,这套流程是完全自动化的。
回顾:Future 的一生
到这里,我们终于可以把 Rust 异步系统的核心机制串成一条完整的时间线。这一切都围绕着一个中心概念:Future 是一台可暂停、可恢复的状态机。
定义:Future 是状态机
当写下:
async fn foo() -> i32 { 42 }编译器会生成一个实现了 Future 的匿名结构体。它包含了所有局部变量与挂起点,并实现了:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>这就是“异步任务”的核心接口。
固定位置:Pin
由于异步函数会在不同挂起点之间保存局部引用,编译器需要确保它的内存地址不会改变。 Pin 的出现,就是为了让 Future “钉在原地”,防止状态机在被多线程调度时出现自引用失效。
可移动类型:Unpin
大多数类型(如 i32、String)并不存在自引用问题,它们可以安全地被移动。Rust 用 Unpin trait 来标记这些“安全可移动”的类型。
因此:
- 对
Unpin类型,Pin无实质约束; - 对
!Unpin类型(如 Future),Pin阻止它被移动。
运行:Poll
当 runtime 想要执行一个 Future 时,会调用它的 poll()。每次 poll() 都尝试向前推进任务:
| 状态 | 含义 |
|---|---|
Poll::Pending | 暂时无法继续,等待外部事件 |
Poll::Ready(v) | 执行完成,返回结果 |
Future 不会自己“跑起来”,它只是被动地等待 runtime 反复 poll,每次推进一步——直到最终返回 Ready。
唤醒:Waker
当 Future 返回Pending时,它通常会注册一个 Waker。当外部事件(I/O 完成、定时器超时等)发生时,Waker 会调用 wake() 通知 runtime:
“我准备好了,请再次 poll 我。”
于是 runtime 将任务重新放入执行队列,下一轮调度时再次调用 poll(),任务继续前进。
闭环:Runtime 驱动 Future
Runtime(Tokio、async-std 等)就是整个循环的 orchestrator:
loop {
match Future.poll(&mut ctx) {
Pending => 等待事件 → wake() → 再 poll,
Ready(v) => 返回结果 → 结束
}
}Future 只是定义了“如何推进自己”;Waker 提供了“什么时候可以再推进”; Runtime 则负责把这一切组织起来,让成千上万个 Future 同时运转。
执行流程图
尾声
到此,我们完成了对 Rust 异步底层的全景观察:从 Future 的定义,到 Pin 的内存安全,再到Poll/Waker的协作。 这四个概念构成了 Rust 异步系统的基础设施。而下一步,我们将离开“机制层”,进入真正的运行世界。
补充
//这个是伪代码
struct SelfRef {
data: String,
pointer: *const String, // 指向自己的 data 字段
}
fn make_ref() -> SelfRef {
let s = SelfRef {
data: String::from("hello"),
pointer: std::ptr::null(),
};
// pointer 指向 data 自己
let mut s = s;
s.pointer = &s.data;
s
}在这个例子中,SelfRef里面的一个指针指向了自身的一个数据字段的引用。所以SelfRef也叫自引用结构体。
await的补充
await:让 Future 继续向前推进的语法糖
前面我们已经理解了 Future 的本质:它是一个 可暂停、可恢复的状态机,通过 poll() 来一次次推进。但我们在日常写 Rust 异步代码时,很少直接调用 poll(),而是写成:
let v = fut.await;那么问题来了:
.await到底做了什么?- 它是如何让 Future 暂停、恢复、继续执行的?
.await是否会阻塞线程?- 内部的
poll调用链是怎么传播的? 本节我们把这些问题全部讲清楚。
.await 并不会阻塞线程
Rust 的 .await 和其它语言(如 JS、Python)的 await 看起来相似,但语义完全不同。 在 Rust 中:
.await不会阻塞线程,它只会让当前 Future 暂停(返回 Pending),让线程去执行别的任务。
你写下:
let x = fut.await;编译器会展开成类似如下的状态机片段(伪代码):
loop {
match Pin::new(&mut fut).poll(cx) {
Poll::Ready(v) => break v,
Poll::Pending => return Poll::Pending,
}
}当 fut 还没准备好时:
- 状态机直接
return Poll::Pending - 线程被释放出去,执行其他 task
- 不会有任何阻塞行为 因此:await 阻塞的不是线程,而是当前 Future 的执行进度。 线程永远不会被 await 卡住。
.await 是一个状态机跳转点
每个 await 都是一个“挂起点”,编译器会为其生成一个状态,例如:
async fn example() {
step1().await;
step2().await;
}会变成(伪代码):
enum State {
Step1,
Step2,
Done,
}await 本质上是:“执行 step1 的 Future,如果它返回 Pending,那么暂停整个 example(),等待唤醒后再从 Step1 继续执行。”线程完全不参与等待。
3. .await 的内部流程(完整执行链)
下面是 .await 从挂起到恢复的完整执行顺序,你之后写 runtime 小节时可以复用这张链条:
async fn foo() {
fut.await;
}展开后实际上是:
- Runtime poll
foo() foopollfutfut返回Pending并注册 Wakerfoo也返回Pending- 线程继续执行其他任务(无阻塞)
- 某事件触发(I/O、timer、信号等)
- Waker::wake() → 将
foo重新放回调度队列 - Runtime 下轮 poll
foo foo再次 pollfut- 如果
futReady → 返回值 → 状态前进到下一个 await 因此:await = poll 子 future + 挂起状态机 + 等待唤醒 + 恢复执行 没有任何同步阻塞。
为什么 .await 必须在 async 内部?
因为它必须被编译成状态机跳转点,需要编译器在 async 语法糖下展开。例如:
let v = fut.await; //错误:必须在 async 中async 是状态机生成器,await 是状态机的分支迁移点,两者缺一不可。
总结
await 的本质是:
对 Future 调用 poll,一旦返回 Pending,就立即暂停当前 async 函数,并把线程让给其他任务;当 Future 准备就绪时,由 Waker 唤醒,再次 poll,恢复状态机的执行。 它从不阻塞线程,只暂停 Future 自己的执行进度。