理解Rust异步编程(2)
理解 Rust 异步编程:深入Tokio,理解Rust异步运行时的灵魂
在上一章《理解 Rust 异步编程:从 Send/Sync 到 Future、Waker、Poll》中,我们已经搞清楚了几件事:
async fn在编译期会被降级成一个Future状态机;- 这个状态机通过
poll一步步向前推进; Pin负责保证状态机在内存中的位置不被随意移动;Waker负责在 I/O、定时器等事件就绪时把任务“叫醒”。 但有了这些“零件”还不够,这些被编译出来的 Future 谁来不断 poll?谁来管理时间和 I/O?谁来调度成千上万的任务? 这一层,就是本篇要讲的主角:异步运行时(Asynchronous Runtime)。 在这一章,我们会围绕三个问题展开:
- Runtime 到底是什么?
它在整个异步系统里扮演什么角色,和Future之间是什么关系? - Tokio vs async-std 是什么区别?
它们都叫 Runtime,本质上在模型和实现上有何不同? - 如果我要自己写一个 Runtime,大概需要具备哪些组件?
要覆盖 I/O、定时器、调度、阻塞任务还是说其他什么东西?
Runtime 是什么?
在上一章我们已经看到:Rust 里所有的 async 代码,最终都会被编译成一个个 Future 状态机。
但 Future 自己是不会动的——它只提供了一个接口:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;真正让这群 Future“活过来”的,是一个在背后不断选出下一个要执行的任务,调用它的 poll并根据 Ready / Pending 决定下一步动作的“发动机”。这个发动机,就是 Runtime。 如果用一句话来概括:Runtime 就是负责驱动所有 Future 的引擎,并把整个异步系统串在一起。 回顾上一章的内容,我们可以很自然地列出,一个“最小可用”的 Runtime 至少要解决这些问题:
- Future 本身的状态推进:不断调用
poll,让状态机从Pending走到Ready。 Poll的执行机制:在什么线程、以什么顺序、用什么策略去poll一大堆任务。Waker的调度:当 I/O 或计时器就绪时,如何通过Waker把对应的任务重新放回队列。- 接入操作系统 I/O:使用 epoll/kqueue/IOCP 等机制,把“等待事件”交给内核完成。
在此基础之上,一个工程上“好用”的 Runtime,还往往需要: 5. 整体调度策略:如何进行多线程调度、使用什么方法让调度更加高性能等。 6. 时间管理:使用定时器、时间轮还是其他。 7. 并发事件管理:如何处理大量连接、大量任务执行的公平性问题或者保证吞吐性能。 8. 阻塞任务管理:如何处理 spawn_blocking、block_in_place 这种“必须阻塞”的场景。 本篇会从 Tokio 入手,先把它的运行时结构拆解,再对比一下其他 Runtime(例如 async-std),最后再回到一个问题:如果要自己设计一个 Runtime,需要注意哪些问题?
Tokio:高性能异步运行时
首先我们可以直接拿出一张大图:
也许会有点懵,其实这个基本是tokio runtime内部的运行流程,接下来我们一步一步进入tokio runtime的世界。
启动Runtime
在启动阶段,我们会从上面的流程图看到这几个步骤:
这其实就是我们平时写 Tokio 程序时,那几行看起来理所当然的启动代码对应的内部流程,对应我们使用tokio的创建主线程代码:
// 默认就是 multi_thread + enable_all
#[tokio::main]
async fn main() {
// 这里开始,就是运行在 Tokio Runtime 里的异步世界
let handle = tokio::spawn(async {
// ...
});
handle.await.unwrap();
}#[tokio::main] 做的事情,大致可以展开成下面这样(省略了一些默认配置):
use tokio::runtime::Builder;
async fn async_main() {
let handle = tokio::spawn(async {
// 这里可以用 tokio 的 IO、time 等功能
});
handle.await.unwrap();
}
fn main() {
// 1. Runtime::new / Builder
let runtime = Builder::new_multi_thread()
// 2. 选择 Runtime 类型:multi_thread
.worker_threads(4) // 创建 4 个 worker 线程
// 3. 创建 Driver:IO + Timer + Signal
.enable_io()
.enable_time()
// 4. 创建 Blocking 线程池(内部会自动初始化)
.max_blocking_threads(32)
// 5. 构建出完整 Runtime
.build()
.expect("failed to build runtime");
// 6. 进入 block_on(async_main)
runtime.block_on(async_main());
}换句话说:
Builder::new_multi_thread()/new_current_thread()→ 对应流程图里的「选择 Runtime 类型」分支;.enable_io().enable_time()(或.enable_all()) → 对应「创建 Driver:IO + Timer + Signal」;.max_blocking_threads(...)→ 对应「创建 Blocking 线程池」;.build()→ 把调度器、driver、blocking 池这些部件全部组装起来;runtime.block_on(async_main())→ 正式“点火”,开始驱动async_main这个 Future。
multi_thread vs current_thread:
multi_thread也就是多线程模式,在这个模式下Tokio 会创建一组 worker 线程,把任务分发到不同线程上执行,这是服务器场景几乎默认的选择,也是tokio完整的状态(以下会提到),worker线程数量需要和当前计算机的CPU线程数对应,以此才能发挥全部性能(因为worker本质上是一个poll future的CPU密集的工作,所以线程数大于核心数只会带来更多不必要的上下文切换。当然要是有需求的话也是可以超出线程数量的);current_thread那就是单线程模式,所有任务都在当前线程上调度,适合嵌入到现有的单线程环境(例如 GUI 主线程、游戏主循环)中。 Driver & Blocking 池:Driver 负责把 Runtime 接到操作系统上:IO 复用(epoll/kqueue)、定时器、信号等都从这里接入。而Blocking线程池专门用来承接spawn_blocking/block_in_place这类“必须阻塞”的任务,避免把 worker 线程卡死,这一块会在后文单独展开。
所以说,这张初始化流程图要表达的就是:当我们写下一个 #[tokio::main],Tokio 在背后帮我们初始化了一整套异步环境——线程池、调度器、驱动器、阻塞池都准备好之后,才开始真正去 poll 的 async main。
任务创建与调度
接下来我们到了这个阶段:
从调用的视角看,这一步非常简单:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// 这里是子任务的异步逻辑
// 比如读写网络、sleep、访问数据库……
42
});
// JoinHandle 本身也是一个 Future
let result = handle.await.unwrap();
println!("child result = {result}");
}但在 Runtime 内部,这一行 tokio::spawn(...) 实际上触发了三件事:
- 把写的 async 代码,封装成一个可管理的 Task
- 把这个 Task 交给调度器(Scheduler)
- 返回一个
JoinHandle,让调用方可以在之后拿回结果 下面按这三个步骤来一步步分析。
Future → Task:把状态机装进“容器”
我们知道,首先async { ... } 在编译期会变成一个 Future 状态机。tokio::spawn 做的第一件事,就是把这个 Future 装进 Tokio 自己的 Task 容器里,这个容器会额外记录:
- 任务的当前状态(正在运行、挂起、已完成、被取消……)
- 任务的唯一 ID、统计信息(用于调试和内部 metrics)
- 用来唤醒等待者(
JoinHandle)的 waker 可以把它想象成:Task = Future + 状态位 + 引用计数 + JoinHandle 的 waker + 调度器引用 这样 Runtime 就不再直接操作的 Future,而是对一个Task结构做事情: - 给它加减引用计数(决定何时释放内存)
- 从队列里取出/放回
- 在合适的时候
poll里面的Future - 在任务完成后,把结果写回给
JoinHandle
Scheduler:把 Task 丢到哪个 worker 上跑?
接着是把这个新 Task 丢进调度器的队列里。在多线程 Runtime 下(new_multi_thread),调度器内部维护了每个 worker 线程自己的本地队列(local queue)以及一个全局队列(inject queue),用于跨线程投递任务。tokio::spawn创建 Task 后,会通过 scheduler::Handle尽量把任务放到当前 worker 的本地队列(提高缓存命中),如果是在 Runtime 外部线程调用 spawn,就会走全局注入队列 在单线程 Runtime 下(new_current_thread),事情就简单得多:所有任务都在当前线程排队,调度器只维护一个队列。 总体上,这一步对应的动作就是:
tokio::spawn(async { ... })
→ 封装成 Task
→ 调度器把 Task 入队
→ 某个 worker 在线程循环中把它取出来 pollJoinHandle:给调用方一个“句柄”
最后tokio::spawn 会返回一个 JoinHandle<T>:
let handle: tokio::task::JoinHandle<i32> = tokio::spawn(async {
42
});JoinHandle<T> 本身也是一个 Future<Output = Result<T, JoinError>>,当子任务完成时:Task 会把 Output 写到自己的内部缓冲区里,并唤醒 JoinHandle 里注册的 waker 在 main 里 handle.await,其实就是在等待:要么拿到子任务的返回值T;要么拿到一个 JoinError(比如任务被取消、内部 panic)。 这个“句柄”有两个重要含义:
- 子任务生命周期的“拥有者”:
当丢掉JoinHandle而不.await时,相当于告诉 Runtime:“我不关心结果了,自己跑完就释放就好”。 - 错误传播的通道:
子任务里的 panic,不会直接炸穿整个程序进程,而是被包装成JoinError交给持有JoinHandle的那一方处理。 到这里,我们可以把tokio::spawn这一阶段的心智模型总结成一句话: Future 状态机 → Task 容器 → 交给调度器 → 返回一个可等待的 JoinHandle。
worker 执行任务
这些 Task 被放入队列之后,Tokio的worker 线程是如何不断从队列里取出任务、调用 poll、在 Ready/Pending 之间切换,并在没有任务时优雅地睡觉。以下是执行流程
下面我们按这个流程分成三部分来看。
worker 主循环:从队列里“领活干”
在 multi_thread 模式下,Runtime 会预先创建一组 worker 线程。每个 worker 的职责很简单: 不停地从自己的任务队列里拿 Task 出来,调用它里面的 Future 的 poll。 如果用伪代码来描述一个 worker 的主循环,大致是这样:
loop {
// 1. 先从本地队列里拿任务(没有的话再尝试从全局队列 / 其他 worker 偷)
let task = scheduler.next_task_for_this_worker();
match task {
Some(task) => {
// 2. 拿到 Task,就对它里面的 Future 做一次 poll
poll_one_task(task);
}
None => {
// 3. 实在没活干了,就进入休眠(park),等待新的任务或 IO/Timer 事件
scheduler.park_current_worker();
}
}
}真正的 Tokio 源码里还有 LIFO slot、work-stealing 等优化,但是我们这里可以做一些简化:
- 每个 worker 就是一个“任务执行循环”;
- 有任务就 poll,没有任务就让自己睡一会;
- 长时间没有任务时,会进入
park,交给底层 Driver 去等待 IO 和定时器。
poll 返回 Ready:Future 完成,收尾 + 通知 JoinHandle
当 worker 从队列里拿到一个 Task,会通过它内部的 Harness 调用一次 poll:
// 伪代码
match task.poll() {
Poll::Ready(output) => { /* 走完成路径 */ }
Poll::Pending => { /* 走挂起路径 */ }
}如果这次 poll 返回的是 Ready(output),说明这个 Future 的状态机已经执行到终点了。这时 Tokio 会做几件事:
- 把结果写进 Task 内部的存储区域
- 通知等待这个任务结果的
JoinHandle,Task 会检查有没有人关心结果(JOIN_INTEREST),如果有,就通过之前注册好的 waker 把对应的JoinHandle唤醒;于是在上层.await handle那一行就会被重新调度,拿到子任务返回值。 - 释放 Task 的资源
poll返回 Pending:注册 Waker,然后去做别的活
更常见的情况是:某个 Future 暂时干不下去了,需要等 IO、sleep、channel 等事件,这时它会返回 Poll::Pending。 在这个分支里,有两个关键点:
- Waker 的注册发生在 Future 内部 这里有一个例子:
async fn handler(stream: &mut TcpStream) {
let mut buf = [0u8; 1024];
// 这行在编译后会变成poll 一个底层的 IO Future
let n = stream.read(&mut buf).await.unwrap();
// ...
}这里的read().await对应的那个 IO Future,当它第一次被 poll 时,它会把传进来的 Context 里的 Waker,注册到底层 IO driver(epoll/kqueue 等)里,然后返回 Poll::Pending,告诉 Tokio:“我现在没法继续,等内核那边有数据了再叫醒我”。 也就是说,worker 自己并不关心 waker 是怎么注册的,它只是调了一次 poll,结果是 Pending,就知道这个任务目前不能继续前进,可以把时间让给别的任务了。 2. IO/Timer 就绪发生什么 当底层 IO driver 或 Timer 发现对应事件已经就绪,会调用之前注册的 waker(waker.wake();),这个 wake() 的最终效果就是:
- 把对应的 Task 重新放回某个 worker 的任务队列(或者全局队列);
- 等该 worker 下一轮循环
next_task()时,再次取出这个 Task; - 对里面的 Future 再
poll一次,从上次await的位置继续往后执行。
把这三步串起来,其实 worker 做的事情可以用很朴素的一句话概括:worker = “有任务就 poll 一下,没任务就睡觉,poll 完成就返回,poll Pending 就略过,让driver主动唤醒我,自己去干别的事。 下一节,我们就顺着 Pending 这条支线,去看一眼 Waker 和底层 Driver(IO + Timer)是如何协作的:
也就是:Task 挂起之后,它到底被挂在哪,内核事件发生时又是怎么把它叫回来的。
等待 & 唤醒
前面在 worker 执行任务 那一节里,我们看到:当某个 Future 暂时干不下去时,会返回 Poll::Pending,并在内部把自己的 Waker 注册到对应的资源上。接下来,就轮到 Runtime 里的 Driver 出场了:
可以把这一段简单理解成:“谁来帮我们盯着 IO/定时器/信号这些事件,并在合适的时机帮我们调用 waker.wake()?” 答案就是 Driver。
从 Pending 到 Driver:Waker 被挂在哪?
还拿一个熟悉的例子:
async fn handle(stream: &mut TcpStream) {
let mut buf = [0u8; 1024];
// 这行在编译后会变成一个 IO Future 的 poll 调用
let n = stream.read(&mut buf).await.unwrap();
println!("read {n} bytes");
}第一次 poll 这个 read() 对应的 IO Future 时,大致会做几件事:
- 把传进来的
Context<'_>里的Waker拿出来; - 把这个
waker跟底层的socket/注册 key绑在一起; - 调用底层 IO 接口(epoll/kqueue/IOCP),告诉内核:“当这个fd上可以读了,就把我叫醒(触发一个事件)。” 然后,这个 Future 返回
Poll::Pending,worker 线程继续去干别的 Task 了。 所以图里的这一句:「注册 Waker,交给 IO/Timer Driver 监听」 本质就是:Future 在自己的poll里,拿着 waker 去和具体资源(socket / timer / signal)绑定起来。
Driver 在做什么:阻塞等待系统事件
接下来轮到 Driver 出场。Tokio 会在 Runtime 初始化时创建一个统一的 Driver 结构,把几种“等待型资源”包起来:
- IO Driver:负责 socket、管道、文件描述符等;
- Timer Driver:负责定时器、
sleep、timeout; - Signal / 子进程 Driver:负责操作系统信号、进程退出等。 当 worker 发现暂时没有任务可执行时,会进入
park,把“等待的责任”交给 Driver:
loop {
if let Some(task) = scheduler.next_task_for_this_worker() {
poll_one_task(task);
} else {
// 把当前线程交给 Driver 管
driver.park(); // 或者 park_timeout(...)
}
}driver.park() 做的事情可以粗暴地理解成两句:
- 按照最近一个定时器的到期时间,计算出一个
timeout; - 调用底层的 IO 复用接口(epoll_wait / kevent / 等等),阻塞等待事件或超时。
有事件发生:通过 Waker 把 Task 推回调度器
当 driver.park() 从系统调用里醒来(要么有事件,要么 timeout),当有事件发生的时候,流程就会走向“是”的分支。 这一步的核心逻辑是:IO/Timer/Signal driver 根据内核返回的事件,找到对应的“注册对象”(例如某个 socket、某个 TimerHandle)。在这些对象里,之前已经记录好了一个 Waker(就是 Future poll 时传进去的那个)。最后调用这个 waker 的 wake() 方法。 那么wake()就走下面这个流程:
- 找到对应的 Task(因为 waker 里绑着 Task 的指针/句柄);
- 把这个 Task 丢回到某个 worker 的任务队列里(或全局队列);
- 避免重复入队(内部会有状态位保证“唤醒一次,只排队一次”)。 这样下一轮循环时,某个 worker 会通过
next_task()拿到这个 Task,并再次poll它里面的 Future,从而查看当前Future是否需要返回。 用一句话描述就是:外界事件 → Driver 接收 → 找到对应 waker →waker.wake()→ Task 回到调度队列 → 下次被 worker 再次poll
没有事件:继续睡,直到有事可做
如果 driver.park() 一段时间内都等不到任何事件,或者等待时间未到、暂时也没有新任务可以调度,那么driver则会继续休眠,下一个事件或者是新任务,那为什么要这么做呢,这就是tokio的巧妙设计之处:没有任务,又没有 IO/Timer 事件,就不要白白占着 CPU。 具体的表现就是:Driver 会让当前线程直接睡一小段时间(或者直到下一次定时器到期 / 有 IO 事件),再重新进入 park 循环。在这个过程中会让出CPU资源。 所以,从宏观上看:
- 当有大量任务或事件时:worker 和 driver 会非常活跃地在“取任务 → poll → 注册 waker → park → 事件唤醒 → 再 poll”之间循环;
- 当几乎没有任务时:worker 线程会逐渐全部
park,CPU 使用率降到很低,直到新的连接、请求或定时器打破平静。 到这里,我们可以把「等待 & 唤醒」这一环总结成一句完整的话: 当一个 Future 返回Pending时,它会在自己的poll里把Waker注册到具体资源上,然后交给 Driver 去“盯着”阻塞等待IO/定时器/信号事件,事件到来时,通过 waker 把对应的 Task 推回调度队列。等下次被 worker 取出来poll时,Future 状态机从上一次await的位置继续向前推进。 这样,从tokio::spawn到 worker 调度,再到 Driver 等待与唤醒,我们就把 Tokio Runtime 的主循环完整跑了一遍。
阻塞
上面整个过程基本是tokio的整体异步任务处理流程了,在现实中,异步任务不一定都是非阻塞的,也有需要阻塞的代码,那么tokio如何处理呢?我们可以看看这个流程图:
spawn_blocking:专门的 Blocking 线程池
当我们需要阻塞任务的时候,一般是这样处理的:
async fn handle_req(...) {
// 一些 async 操作...
let data = tokio::task::spawn_blocking(move || {
// 这里是阻塞世界:可以放心用老库 / 重 IO / 重 CPU
std::thread::sleep(std::time::Duration::from_millis(500));
heavy_compute()
})
.await
.unwrap(); // JoinError
// 拿到结果后,继续 async 流程...
}那么这段代码会发生什么呢?
- 在 async 任务里调用
spawn_blocking时:Tokio 会把传入的闭包move || { ... }封装成一个“普通阻塞任务”,丢到单独的 blocking 线程池 中;这个线程池和前面的 worker 线程池是 两套资源,有单独的最大线程数(max_blocking_threads)。 - 当前 async 任务这边:
spawn_blocking(...)会返回一个JoinHandle<T>,当await它的时候,当前 Future 会像别的 IO 一样返回Pending,把自己挂起;等阻塞线程池那边跑完闭包之后,把结果写回、唤醒对应的 waker,此时的 async 任务再被调度起来,继续往下执行。 用大白话说就是:spawn_blocking= 把这段阻塞代码扔给另一波线程干,自己这边 async 任务先挂起,等他们干完再继续。 这保证了worker 线程只负责驱动 async Future,不负责具体的阻塞,并且阻塞任务有专用的阻塞池,所以不会影响整个Runtime的调度。
block_in_place:在多线程 Runtime 里“让位”
还有一种情况:已经在多线程 Runtime 的 worker 线程里了,而且:
- 调用了一个必须阻塞的 API;
- 又希望尽量少引入跨线程通信(例如要用到当前线程的一些局部状态)。 这时 Tokio 提供了一个“临时挪车位”的 API:
block_in_place。伪代码大概这样:
use tokio::task::block_in_place;
async fn handle_req(...) {
// 前面都是 async 逻辑
let result = block_in_place(|| {
// 这里是同步阻塞区域,但不会卡死整个 worker 调度
legacy_blocking_call()
});
// result 是 legacy_blocking_call 的返回值
// 后面继续 async 逻辑……
}内部的整体流程可以概括成 4 步:
- 确认当前线程确实是多线程 Runtime 的 worker
- 把当前 worker 的调度核心 Core“腾出来”
- 把这个 worker 的 Core 交给新的线程去跑
- 当前线程退出 Runtime 上下文,并执行阻塞闭包** 所以
block_in_place的语义可以理解为:“我现在在 worker 上,但我要临时去阻塞一会儿。Core 帮我交给别的线程接着跑,我这条线程先退一会儿队,干完再回来。”
这些工作是为了保证阻塞任务不会阻塞整个runtime从而导致调度停摆。 结合前面的调度流程,我们可以给阻塞这块来一句总结: 正常 async 任务:由 worker 线程 + Driver 共同驱动,有任务就 poll,没任务就 park,等待 IO / Timer 事件。 必须阻塞的任务:用 spawn_blocking 丢到 Blocking 池,避免卡住 worker,在多线程 Runtime 内部,还可以用 block_in_place 把 Core 转移给新线程,让当前线程专心阻塞。
总结
这一章其实就讲了一件事:Tokio 是怎么接手 Rust 里的 Future,把它们跑起来的。 因为所有 async fn 在编译期都会变成 Future 状态机,本身不会自己动。需要异步运行时驱动,此章介绍了tokio整个运行流程,从初始化Runtime到具体的任务处理。那么下一章我们将会探讨争议最高的async-std与tokio到底有何不同。
async-std vs tokio
上一章我们了解了整个tokio的处理流程,那么在这一章里面,我们将探讨和tokio争议最大的async-std异步运行时库,我们将从两个方面入手,第一,async-std流程是什么,和tokio有何区别;第二,为何不能混用两者。
async-std基本流程
我们可以从此大图查看整个async-std处理流程:
我们可以从以上流程直观地看出来,async-std 的整体形状和 Tokio 很像,但很多关键点做法不一样,大概有这几层差异:
- 全局 Runtime,而不是显式
Runtime对象
在 Tokio 里会显式地构建一个Runtime,然后runtime.block_on(...);
而在 async-std 里,async_std::task::block_on和#[async_std::main]会帮懒初始化一个全局的 Executor + Reactor: 第一次进入block_on时:创建线程池 + async-io Reactor + 定时器轮;后续再调用block_on或spawn时,直接复用这套全局 runtime。 这也解释了为什么 async-std 的 API 风格更 “像 std”:几乎感受不到 Runtime 这个显式概念。 - Runnable“可再次执行的 Task 外壳”+JoinHandle等结果 vs JoinHandle一把梭
在 Tokio 里:JoinHandle既是结果通道,又在 Task 生命周期里“占了一个身份”;
在 async-std 里,async_task::spawn会把任务拆成两块:Runnable:真正会被 Executor 线程反复拿出来run()的那块(内部封装了 Future + 状态 + waker 等);JoinHandle<T>:只是“有人在等结果吗?”的那一头,本身是一个 Future,但基本不参与调度逻辑。
换句话说:在 async-std 里,“谁在跑任务”和“谁在等结果”是更彻底分离的,图里 S1 就是:spawn→ 生成Runnable + JoinHandle<T>→ 把Runnable丢进队列 → 把JoinHandle还给调用方。 - 执行模型:统一的 Executor 事件循环 vs async-io Reactor Executor 线程做的事很简单:取一个
Runnable→run()一次 → 里面的 Futurepoll成 Ready 就写回输出、唤醒 JoinHandle;Pending 就让 Future 自己把 waker 挂到 socket/timer 上。 Reactor 线程则负责真正的 “等待”:阻塞在 epoll/kevent/IOCP 上,事件来了就根据注册信息找到 waker,wake()后把对应Runnable再次丢回 Executor 队列。
整体仍然是:“有任务就 poll,没任务就让内核帮忙盯着 IO/定时器”,只是 async-std 通过 async-io 这个库把 IO/定时器切分出去了。 - 阻塞处理策略:阻塞分离 vs 阻塞检测
Tokio 把阻塞任务和异步任务显式地分开:spawn_blocking丢到专门的 blocking 线程池;block_in_place在 multi-thread runtime里挪 Core; 而 async-std 采用的是统一任务模型 + 阻塞检测 / 线程扩展:表面上看,所有任务都是spawn出来的Runnable;Executor 如果发现某个线程长时间被一个任务占住,会把这个线程视为“阻塞”,再拉起新的 Executor 线程补位。总体区别是async-std 是在“自动帮判定谁是阻塞任务”,Tokio 则要求显式标记哪些是阻塞代码。 由于本文是以 Tokio 为主线,目标是探讨 “一个异步 Runtime 的主要构成”,所以对 async-std 我们只做结构级别的对照,对于执行细节,本文不作详细的探讨。总的来说async-std和tokio本身的结构差异还是十分大的,那么为什么不能混用这两个异步运行时呢?我们将会在下一节继续探讨。
async-std与tokio
其实上一节的结构差异,几乎回答了“为什么不能随便混用这两个异步 runtime”这个问题答案。这一节我们把这个答案剩下的讲清楚。我们可以从三个角度来切入:
IO / Timer / FS 类型强绑定各自的 Runtime
最直观的一条:Tokio 和 async-std 自己都提供了一整套“IO 家族”:
- Tokio:
tokio::net::TcpStream/UdpSocket/TcpListener/tokio::fs::File/tokio::time::sleep… - async-std:
async_std::net::TcpStream/UdpSocket/TcpListener/async_std::fs::File/async_std::task::sleep… 这些类型不是“纯粹的Future封装”,而是深度绑定了各自 runtime 的 Reactor / Driver,比如: tokio::net::TcpStream在第一次poll_read时,会向 Tokio 的 IO driver 注册 waker,期望是:只有在 Tokio 的 runtime(有 IO driver 的上下文)里 poll,它才知道去哪儿挂 waker、从哪儿拿事件。async_std::net::TcpStream则通过 async-io 自己那套 Reactor(async_io::Timer、async_io::Async<...>)做注册。 如果在 Tokio 里用 async-std 的 IO 类型,或在 async-std 里用 Tokio 的 IO 类型,会出现经典问题:“没有运行中的 reactor/driver”或者 IO 一直 Pending、不唤醒,表现为请求“卡死”。
两套调度循环,互不干扰,也互不交流
调度层面,Tokio 和 async-std 对线程 / 阻塞的策略也不一样:
- Tokio:有一个明确的“worker 线程池 + blocking 线程池”,使用阻塞时,要显式
spawn_blocking/block_in_place; - async-std:采用 async-global-executor + async-io,一开始线程少,发现某个线程被占满就“判定为阻塞”,再拉起额外线程来顶上。 那么在实际的操作中就会发生一些微妙的问题,这里有一个例子:
#[tokio::main]
async fn main() {
// 在 Tokio runtime 里起一个 async-std 任务
async_std::task::spawn(async {
// 里面再偷偷用 Tokio 的东西……
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
});
}那么在实际的运行中,整段代码会直接panic,因为async_std找不到对应的上下文,当我们换成这段代码时:
#[tokio::main]
async fn main() {
// 在 Tokio runtime 里起一个 async-std 任务
let handle = tokio::runtime::Handle::current();
async_std::task::spawn(async move {
let _guard = handle.enter(); // 手动在当前线程(async-std 的线程)注入 Tokio 上下文
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
});
}这段代码是成功运行的,但是他会出现以下问题:
- 里面的任务是由tokio的runtime进行管理的,但是却提交至async-std的runtime上下文中去了。
- 一旦这个任务被调度到另一个线程上,而你没有在那个线程里再
handle.enter(),Tokio 的 IO/Timer 一样会因为缺少上下文而 panic。 - task在poll的时候,整个上下文是tokio的,但是waker却是async-std的。虽然在抽象层两者都是std
Waker,但是实际上具体实现却是天差地别。
收益极小,复杂度极大
从“理论上”讲,可以用一些兼容层,比如:
- 把某一边暴露为
Future<Output = T>,在另一边spawn/block_on; - 或者用兼容 crate(
async_compat一类)在 Tokio 里跑 async-std 的 Future 反之亦然。 但工程上几乎没人会这么做,原因很现实:
- 调试难度陡然上升:如果程序出现崩溃,要想清楚“这个 Future 当前是被哪个 runtime poll 的”?
- 约束条件变多:要记住“哪些模块只能用 Tokio 的 API,哪些模块只能用 async-std 的 API”,任何人只要忘了这层约束,很容易写出“在 async-std 里用 Tokio IO”这种逻辑上无法保证正确、行为极难预期的代码。
- 收益非常有限:两个runtime实现库几乎都是独立生态,每一种功能都有对应的做法,跨库调用的收益不如直接调用或者是基于库本身再度自定义。
总结
实际上,我们可以在一个进程里同时引入 Tokio 和 async-std,但它们是两套各自完整的异步世界:各自有自己的 IO 类型、Reactor、调度策略和阻塞假设。
想强行混用,就相当于在一个城市里修两套互不通信的地铁系统——不是不能修,而是: 成本极高、复杂度极高,而且几乎没有比“选一个并用好它”更好的收益。
题外话:为什么要选择tokio作为本系列的主要研究对象,是因为tokio拥有最大的并发工具链生态, 并且从上面的结构可以看出,在典型的异步非阻塞服务场景里,Tokio 的设计更偏向高吞吐和可控的阻塞策略,也更贴近目前主流生态的使用方式。 再者说,tokio几乎成为了Rust异步模型领域的首要解决方案。 基于此,本系列才选择tokio作为研究对象。
自己动手写一个runtime
⚠️ 本节属于进阶内容:我们会从零实现一个最小 async runtime, 会涉及
JoinHandle、SharedState、ArcWake等细节。 如果你暂时只想搞清楚现有async runtime的整体结构,可以直接跳过本节。
上面我们详细分析了tokio runtime的内部结构以及tokio和async-std之间的结构对比,那么如果我们要自己手动搓一个runtime该这么做呢,我们回顾以下一开始的问题:
- Future 本身的状态推进:不断调用
poll,让状态机从Pending走到Ready。 Poll的执行机制:在什么线程、以什么顺序、用什么策略去poll一大堆任务。Waker的调度:当 I/O 或计时器就绪时,如何通过Waker把对应的任务重新放回队列。- 接入操作系统 I/O:使用 epoll/kqueue/IOCP 等机制,把“等待事件”交给内核完成。
这四个问题几乎是一个async runtime必须要解决的问题,你也许会说,难道后面四个问题就不是了?其实并非,只是受限于文章篇幅,这里只做最核心的开发。 那么通过上面结构学习我们可以参考哪些设计?tokio和async-std的结构我们可以推导出设计runtime有以下元素:
- Task:基本任务主体
- JoinHandle:任务上下文包装
- Scheduler:负责调度任务
- Worker:负责跟踪和推动任务
- TaskQueue:任务队列
- Executer:任务驱动器
- Park:worker休眠
但是若全部实现这些元素,几乎快到 Tokio 正式版的复杂度了,这样一步到位未免过于复杂了,其实我们可以从基本元素入手,那么最基本的元素有哪些呢?只有这些:
- Task:基本任务主体
- Scheduler:负责调度任务
- Executer:任务驱动器 基于以上元素,我们可以的出来这么一个流程图:
现在整体的流程图有了,我们来看看具体的代码设计:
use std::{collections::VecDeque, pin::Pin, sync::Arc, task::Context};
use futures::task::noop_waker;
pub struct Task {
pub future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
}
impl Task {
pub fn new(fut: impl Future+ Send + 'static) -> Self {
Self {
future: Box::pin(fut),
}
}
}
pub struct Executor {
ready_queue: VecDeque>,
}
impl Executor {
pub fn spawn(&mut self, fut: F)
where
F: Future + Send + 'static,
{
let task = Task::new(fut);
self.ready_queue.push_back(task);
}
pub fn block_on(&mut self, fut: F)
where
F: Future + Send + 'static,
{
self.spawn(fut);
self.run();
}
pub fn run(&mut self) {
while let Some(mut task) = self.ready_queue.pop_front() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match task.future.as_mut().poll(&mut cx) {
std::task::Poll::Ready(_) => {
println!("task is ready");
}
std::task::Poll::Pending => {
self.ready_queue.push_back(task);
}
}
}
}
}
fn main() {
let mut exec = Executor {
ready_queue: VecDeque::new(),
};
exec.spawn(async {
println!("hello from task 1");
});
exec.block_on(async {
println!("hello from task 2");
});
exec.run();
}你也许会问,这里没有任何的poll和waker改变future的状态,但是代码为何会执行?这是因为编译器自己将异步闭包:
async {
println!("hello from task 1");
}自己返回了一个Ready,这里我们用一下命令:
cargo +nightly rustc -- -Zunpretty=mir > mir.txt生成出来的mir.txt里面找到我们这一段闭包:
fn main::{closure#0}(_1: Pin<&mut {async block@src/main.rs:75:16: 75:21}>, _2: &mut Context<'_>) -> Poll<()> {
debug _task_context => _2;
let mut _0: std::task::Poll<()>;
let _3: ();
let mut _4: std::fmt::Arguments<'_>;
let mut _5: u32;
let mut _6: &mut {async block@src/main.rs:75:16: 75:21};
bb0: {
_6 = copy (_1.0: &mut {async block@src/main.rs:75:16: 75:21});
_5 = discriminant((*_6));
switchInt(move _5) -> [0: bb1, 1: bb6, 2: bb5, otherwise: bb7];
}
bb1: {
_4 = Arguments::<'_>::from_str(const "hello from task 1\n") -> [return: bb2, unwind: bb4];
}
bb2: {
_3 = _print(move _4) -> [return: bb3, unwind: bb4];
}
bb3: {
_0 = Poll::<()>::Ready(const ());
discriminant((*_6)) = 1;
return;
}
bb4 (cleanup): {
discriminant((*_6)) = 2;
resume;
}
bb5: {
assert(const false, "`async fn` resumed after panicking") -> [success: bb5, unwind continue];
}
bb6: {
assert(const false, "`async fn` resumed after completion") -> [success: bb6, unwind continue];
}
bb7: {
unreachable;
}
}我们会看到:
fn main::{closure#0}(_1: Pin<&mut {async block@src/main.rs:75:16: 75:21}>, _2: &mut Context<'_>) -> Poll<()>以及:
bb3: {
_0 = Poll::<()>::Ready(const ());
discriminant((*_6)) = 1;
return;
}这里就表明了,async{}闭包实际上是一个f:Pin -> Context -> Poll的一个东西,并且在编译的时候直接的就返回了Ready。 那我们回过头看现在这个实现,我们将Future本身的返回值直接取消,直接流转整个future。那么如何让他可以返回呢?我们先把spwan先放一边,可以先写block_on,具体实现:
use std::{collections::VecDeque, pin::Pin, task::Context};
use futures::task::noop_waker;
pub struct Task {
pub future: Pin<Box + Send + 'static>>,
}
impl Task {
pub fn new(fut: impl Future + Send + 'static) -> Self {
Self {
future: Box::pin(fut),
}
}
}
pub struct Executor {
ready_queue: VecDeque>,
}
impl Executor {
pub fn spawn(&mut self, fut: F)
where
F: Future + Send + 'static,
{
let task = Task::new(fut);
self.ready_queue.push_back(task);
}
pub fn block_on(&mut self, fut: F) -> F::Output
where
F: Future + Send + 'static,
{
let task = Task::new(fut);
self.block_run(task)
}
pub fn block_run(&self, mut task: Task) -> T {
loop {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match task.future.as_mut().poll(&mut cx) {
std::task::Poll::Ready(res) => {
return res;
}
std::task::Poll::Pending => continue,
}
}
}
pub fn run(&mut self) {
while let Some(mut task) = self.ready_queue.pop_front() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match task.future.as_mut().poll(&mut cx) {
std::task::Poll::Ready(_) => {
println!("task is ready");
}
std::task::Poll::Pending => {
self.ready_queue.push_back(task);
}
}
}
}
}
fn main() {
let mut exec = Executor {
ready_queue: VecDeque::new(),
};
exec.spawn(async {
println!("hello from task 1");
});
let res = exec.block_on(async { 1 + 1 });
println!("block_on result:{}", res);
exec.run();
}我们来看看运行结果:
block_on result:2
hello from task 1
task is ready你也许会奇怪,为何打印顺序是反的,这是因为,在exec.spawn里面,我们把整个async{}丢到了ready_queue这里面,并且没有执行,但是block_on则是立即执行了,所以就先一步返回结果了。 目前到这里,我们给block_on实现了返回值的效果,那么如何给spawn实现返回值呢,那么就需要引入JoinHandle了,实际上,上面一个例子给了我们一个提示,spawn只有等到exec.run();运行了才能运行,因此他需要一个盒子包起来。于是乎原来的流程图变成以下的模样:
那么对应的具体实现是怎样的呢?是这样的:
use std::{
collections::VecDeque,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Waker},
};
use futures::task::noop_waker;
use std::future::Future;
pub struct Task<T> {
pub future: Pin<Box<dyn Future<Output = T> + Send + 'static>>,
}
impl<T> Task<T> {
pub fn new(fut: impl Future<Output = T> + Send + 'static) -> Self {
Self {
future: Box::pin(fut),
}
}
}
pub struct SharedState<T> {
res: Option<T>,
completed: bool,
waker: Option<Waker>,
}
impl<T> SharedState<T> {
pub fn new() -> Self {
Self {
res: None,
completed: false,
waker: None,
}
}
}
pub struct JoinHandle<T> {
shared: Arc<Mutex<SharedState<T>>>,
}
impl<T> JoinHandle<T> {}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let mut state = self.shared.lock().unwrap();
if state.completed {
let res = state
.res
.take()
.expect("JoinHandle polled after completion but no result stored");
std::task::Poll::Ready(res)
} else {
state.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
}
pub struct Executor {
ready_queue: VecDeque<Task<()>>,
}
impl Executor {
pub fn spawn<F, T>(&mut self, fut: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let state = SharedState::<T>::new();
let state = Arc::new(Mutex::new(state));
let state_arc = state.clone();
let join_handle: JoinHandle<T> = JoinHandle { shared: state };
let task = async move {
let out: T = fut.await;
let mut state = state_arc.lock().unwrap();
state.res = Some(out);
state.completed = true;
if let Some(w) = state.waker.take() {
w.wake();
}
};
let task = Task::new(task);
self.ready_queue.push_back(task);
join_handle
}
pub fn block_on<F, T>(&mut self, fut: F) -> F::Output
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let handle: JoinHandle<T> = self.spawn(fut);
self.run();
self.block_run(handle)
}
pub fn block_run<F, T>(&self, fut: F) -> T
where
F: Future<Output = T>,
{
let mut fut = Box::pin(fut);
loop {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match fut.as_mut().poll(&mut cx) {
std::task::Poll::Ready(res) => {
return res;
}
std::task::Poll::Pending => continue,
}
}
}
pub fn run(&mut self) {
while let Some(mut task) = self.ready_queue.pop_front() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match task.future.as_mut().poll(&mut cx) {
std::task::Poll::Ready(_) => {
println!("task is ready");
}
std::task::Poll::Pending => {
self.ready_queue.push_back(task);
}
}
}
}
}
fn main() {
let mut exec = Executor {
ready_queue: VecDeque::new(),
};
let handle = exec.spawn(async {
println!("hello from task 1");
1 + 1
});
exec.run();
let res = exec.block_run(handle);
println!("spawn res: {}", res);
}现在基本上将spawn和block_on都支持返回值了。目前这个runtime已经较为完整了,但是我们现在看到目前的waker还是空的,那么该如何实现呢?换句话来说,我们该如何引入waker机制呢,是直接像tokio源码一样直接rawwaker嘛,我们可以简单一些:Arc<Task> + mpsc + ArcWake,从这一版开始,我们不再用 busy-loop/round-robin;Pending 的 task 不会自动回队列,必须依靠 waker 唤醒。老样子,还是流程图先上:
现在这个流程图使得实现比较复杂,所以实现之前先分一下代码文件,文件结构如下:
.
├── Cargo.lock
├── Cargo.toml
└── src
├── executor.rs
├── joinhandler.rs
├── lib.rs
├── main.rs
└── task.rs现在分好了代码结构,接下来我们继续实现:
executor.rs
use std::{
sync::{Arc, Mutex, mpsc},
task::{Context, Poll},
};
use futures::task::{noop_waker, waker_ref};
use crate::{
joinhandler::{JoinHandle, SharedState},
task::Task,
};
pub type QueueRecevier = mpsc::Receiver<Arc<Task>>;
pub type QueueSender = mpsc::Sender<Arc<Task>>;
pub struct Executor {
ready_queue: QueueRecevier,
}
pub struct Spawner {
sender: QueueSender,
}
impl Spawner {
pub fn spawn<F>(&self, fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(fut)),
sender: self.sender.clone(),
});
let _ = self.sender.send(task);
}
}
impl Executor {
pub fn new() -> (Self, Spawner) {
let (tx, rx) = mpsc::channel::<Arc<Task>>();
let exec = Self { ready_queue: rx };
let spawner = Spawner { sender: tx };
(exec, spawner)
}
pub fn run(self) {
while let Ok(task) = self.ready_queue.recv() {
let waker = waker_ref(&task);
let mut cx = Context::from_waker(&*waker);
let mut fut = task.future.lock().unwrap();
if let Poll::Pending = fut.as_mut().poll(&mut cx) {}
}
}
}
pub struct Runtime {
executor: Executor,
spawner: Spawner,
}
impl Runtime {
pub fn new() -> Self {
let (executor, spawner) = Executor::new();
Self { executor, spawner }
}
pub fn spawn<F, T>(&self, fut: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let state = SharedState::<T>::new();
let state = Arc::new(Mutex::new(state));
let state_arc = state.clone();
let join_handle: JoinHandle<T> = JoinHandle { shared: state };
let task = async move {
let out: T = fut.await;
let mut state = state_arc.lock().unwrap();
state.res = Some(out);
state.completed = true;
if let Some(w) = state.waker.take() {
w.wake();
}
};
self.spawner.spawn(task);
join_handle
}
pub fn block_on<F, T>(self, fut: F) -> F::Output
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let handle: JoinHandle<T> = self.spawn(fut);
self.run();
Self::block_run(handle)
}
pub fn block_run<F, T>(fut: F) -> T
where
F: Future<Output = T>,
{
let mut fut = Box::pin(fut);
loop {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match fut.as_mut().poll(&mut cx) {
std::task::Poll::Ready(res) => {
return res;
}
std::task::Poll::Pending => continue,
}
}
}
pub fn run(self) {
let Runtime { executor, spawner } = self;
drop(spawner);
executor.run();
}
}joinhandler.rs
use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Waker},
};
pub struct SharedState<T> {
pub res: Option<T>,
pub completed: bool,
pub waker: Option<Waker>,
}
impl<T> SharedState<T> {
pub fn new() -> Self {
Self {
res: None,
completed: false,
waker: None,
}
}
}
pub struct JoinHandle<T> {
pub shared: Arc<Mutex<SharedState<T>>>,
}
impl<T> JoinHandle<T> {}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let mut state = self.shared.lock().unwrap();
if state.completed {
let res = state
.res
.take()
.expect("JoinHandle polled after completion but no result stored");
std::task::Poll::Ready(res)
} else {
state.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
}task.rs
use std::{pin::Pin, sync::Mutex};
use futures::task::ArcWake;
use crate::executor::QueueSender;
pub struct Task {
pub future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
pub sender: QueueSender,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &std::sync::Arc<Self>) {
let _ = arc_self.sender.send(arc_self.clone());
}
}main.rs
use custom_runtime::executor::Runtime;
fn main() {
let exec = Runtime::new();
let handle = exec.spawn(async {
println!("hello from task 1");
1 + 1
});
exec.run();
let res = Runtime::block_run(handle);
println!("spawn res: {}", res);
}以上是流程图的基本实现,至此,整个mini runtime我们实现完成了,虽然说分割出来的文件只有三个,其实仔细发现,我们几乎将前面七个要素都实现了,不过具体在哪里,就留给读者思考了。看到这里,我留给读者一个问题,如何在这个runtime里面,实现一个Sleep函数呢? 到这里,我们用不到 200 行代码把前面那四个关键词串起来了:poll 推进、调度循环、waker 唤醒、JoinHandle 结果回传。剩下的 IO/Timer/park,本质是在同一个框架上挂“事件源”。
总结
在这篇文章中,我们从 Rust异步的最小抽象单元 Future 出发,一路拆解到完整异步运行时的核心结构,逐步回答了一个看似简单、但经常被模糊处理的问题:一个 async runtime 到底在做什么? 我们首先澄清了一个常见误区:async/await 并不等于并发,也不等于异步执行,决定其并发模型的,是async runtime内部的实现。就像文章中的tokio和async-std 是一个典型的async runtime,但是在实现这里,我们只是用了简单的单线程和队列循环,所以说实际上async fn 在编译期只是被降级成一个可被 poll 推进的状态机,真正决定它“何时运行、由谁运行、如何运行”的,是 runtime。
围绕这一点,文章从四个根本问题展开:
- Future 的状态如何被推进(
poll → Pending / Ready) - 谁在调度这些 poll(Executor / Scheduler)
- Future 如何在等待事件时“挂起”,又如何被重新唤醒(Waker)
- 外部事件(I/O / Timer)如何接入这个模型(Driver)
在此基础上,我们对比分析了两种主流 Rust runtime —— Tokio 与 async-std —— 它们在宏观结构上高度一致,但在 调度策略、线程模型、全局/局部 runtime 设计 等关键点上做出了不同取舍。这也解释了一个重要结论:runtime 不是语法层能力,而是一套完整的工程系统,因此 Tokio 与 async-std 不能被随意混用。
最后,通过“自己动手实现一个 mini runtime”的小节,我们把前面所有抽象压缩成一条最小可运行路径:
Task如何封装FutureExecutor如何驱动任务JoinHandle如何返回结果Waker如何让任务从 Pending 状态重新进入调度队列 这个实现探索出来runtime的基本元素,并由此证明:
Tokio 的复杂度不是魔法,而是在同一套基础模型上不断叠加出来的工程结果。
至此,我们已经回答了“runtime 是什么、为什么需要它、它的最小形态是什么”。那么下一节,我们将回到应用层,通过一系列实战,说明为何tokio在异步非阻塞环境下,具有极高性能。